source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 5243

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@5243
Revision 5243, 10.2 KB checked in by cbyrom, 11 years ago (diff)

Adjust logging and output of error in ingest scripts.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
9import ndg.common.src.lib.fileutilities as FileUtilities
10from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
11from abstractdocumentingester import AbstractDocumentIngester
12from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.models.ndgObject import ndgObject
15from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
16import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
17
18class FeedDocumentIngester(AbstractDocumentIngester):
19        '''
20        Class to handle the ingest of files from an eXist feed
21        '''
22        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
23       
24        # interval with which to check the feed for updates
25        pollInterval = 300.0
26       
27        # published time from which feed entries will be ingested
28        ingestFromDate = datetime.datetime.now()
29
30        # temp dir to create docs for running transforms on
31        TEMP_DIR_NAME = "feedPollTempDir"
32
33        def __getDBConnection(self, dBHostname, configFileName):
34                '''
35                Get the default DB connection - by reading in data from the db config file
36                - to eXist DB to get feed docs from
37                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
38                @param configFileName: name of config file to get DB details from
39                '''
40                logging.info("Setting up connection to eXist DB")
41                self.dbClient = SearchClient(dbHostName = dBHostname,
42                                                                         configFileName = configFileName)
43                logging.info("Connection to eXist now set up")
44
45       
46        def setPollInterval(self, interval):
47                '''
48                Set the polling time interval
49                @param interval: an int or float representing the time between checking
50                the atom feed for updates
51                '''
52                if isinstance(interval, str) or isinstance(interval, unicode):
53                        interval = float(interval)
54                       
55                if isinstance(interval, int) or isinstance(interval, float):
56                        self.pollInterval = interval
57                else:
58                        raise ValueError(self.INTERVAL_ERROR %str(interval))
59
60       
61        def setIngestFromDate(self, ingestFromDate):
62                '''
63                Set the published date from which feed entries should be ingested
64                @param ingestFromDate: date from which documents should be ingested - in
65                format YYYY-MM-DD
66                '''
67                ingestDate = formatDateYYYYMMDD(ingestFromDate)
68                if ingestDate:
69                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
70                                                                                                                         YEAR_FORMAT)
71
72
73        def ingestDIFDocument(self, browseURL):
74                '''
75                Retrieve the DIF specified at the browseURL, then ingest
76                the document into postgres
77        @param browseURL: url to retrieve doc from
78                '''
79                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
80       
81                #if not self.feedClient:
82                #       self.__setupFeedClient()
83
84                # get the required info out of the URL
85                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
86                no = ndgObject(ndgURI)
87               
88                # set up things for the provider of the data
89                dataCentre = no.repository.split('.')[0]
90               
91                # check if we're to ingest data from this data centre
92                if self.dataCentrePoll:
93                        if dataCentre != self.dataCentrePoll:
94                                logging.info("DIF doc is from a different data centre ('%s') to \
95                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
96                                return
97               
98                # only bother reloading the data centre info, if we're really interested
99                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
100                        self.getConfigDetails(dataCentre)
101               
102                self.dataCentre = dataCentre
103                self._setupDataCentreDirs()
104               
105                doc = self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
106                                      targetCollection=dc.DIF_COLLECTION_PATH)
107
108                # create a temporary file to allow the ingest to do the various
109                # xquery transforms on
110                difFilename = self.originals_dir + no.localID + '.dif'
111                FileUtilities.createFile(difFilename, doc)
112
113                self._convertAndIngestFiles(self.originals_dir, self.discovery_dir)
114       
115                logging.info("Document fully ingested into discovery service")
116
117
118        def __setupFeedClient(self):
119                '''
120                Set up a feed client - to allow use of RESTful interface to retrieve
121                atoms referenced in the feed
122                '''
123                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
124                                                                         eXistPortNo = self.eXistPortNo)
125
126        def processFeed(self, feedUrl):
127                '''
128                Poll the atom feed and ingest any new data found
129                NB, this is an infinite loop to allow permanent polling
130                @param feedUrl: url to the feed to poll
131                '''
132                # NB, only check the feed every five minutes
133                if time.time() > self.refreshTime:
134                        logging.info("- polling for new data...")
135                        feed = feedparser.parse(feedUrl)
136                        # check for new entries
137                        newEntryNumber = len(feed.entries)
138                        if  newEntryNumber > self.entryNumber:
139                                logging.info("New data found - checking if this has been published after specified ingest date...")
140                               
141                                for i in range(self.entryNumber, newEntryNumber):
142                                        if self.__isEntryIngestible(feed.entries[i]):
143                                                browseURL = feed.entries[i].content[0].src
144                                                self.ingestDIFDocument(browseURL) 
145                       
146                                self.entryNumber = len(feed.entries)
147
148                        self.refreshTime = time.time() + self.pollInterval
149                        logging.info("- finished polling for new data")
150
151       
152        def pollFeed(self, feedUrl):
153                '''
154                Poll the atom feed and ingest any new data found
155                NB, this is an infinite loop to allow permanent polling
156                @param feedUrl: url to the feed to poll
157                '''
158                self.refreshTime = 0.
159                self.entryNumber = 0
160
161                logging.info("Start polling for atom feed updates")
162                while (1 == 1):
163                        self.processFeed(feedUrl)
164
165
166        def __isEntryIngestible(self, entry):
167                '''
168                Check if a feed entry is valid for ingestion, return True, if so
169                - to be ingested, the entry must be later than the specified
170                'ingest from' date and must represent a DIF record
171                @param entry: feedparser.entry from a feed to check
172                @return True, if entry should be ingested, false otherwise
173                '''
174                # firstly, check the entry is a DIF record
175                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
176                        logging.info("- entry represents a DIF record - now check its date")
177                                                       
178                        # now check the entry is published after the specified ingest date
179                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
180                        if self.ingestFromDate < entryDate:
181                                logging.info("- data is new - so valid for ingesting")
182                                return True
183                        else:
184                                logging.info("- data is before the ingest time period")
185                else:
186                        logging.info("- entry doesn't represent a DIF record")
187                logging.info("- data is not valid for ingesting")
188                return False
189
190
191        def usage(self):
192                '''
193                Display input params for the script
194                '''
195                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
196                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
197                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
198                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
199                print " -v - verbose mode for output logging"
200                print " -d - debug mode for output logging"
201                print " \nand keywords are:"
202                print " interval - interval, in seconds, at which to retrieve data from the feed"
203                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
204                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
205                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
206                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
207                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
208                sys.exit(2)
209
210
211        def __init__(self, interval = None, ingestFromDate = None, 
212                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
213                                 eXistPortNo = '8080',
214                                 configFileName = 'passwords.txt', 
215                                 dataCentrePoll = None):
216                '''
217                Set up object
218                @keyword interval: polling interval, in seconds, to set the feed polling to
219                - default is 300s
220                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
221                any feed entries before this date - in format, YYYY-MM-DD
222        @keyword eXistDBHostname: name of eXist DB to use
223        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
224        @keyword configFileName : password file to use - NB, this should
225        have contents of format (NB, space delimiter):
226        dbName userID password
227        Default is 'passwords.txt'.
228        to '8080' if not set
229        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
230        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
231                '''
232                self.dataCentre = None
233               
234                self.dataCentrePoll = dataCentrePoll
235               
236                if interval:
237                        self.setPollInterval(interval)
238                       
239                if ingestFromDate:
240                        self.setIngestFrom(ingestFromDate)
241               
242                self.eXistDBHostname = eXistDBHostname
243                self.eXistPortNo = eXistPortNo
244                self.configFileName = configFileName
245                self.refreshTime = 0.
246                self.entryNumber = 0
247                self._no_files_ingested = 0
248                self._no_problem_files = 0
249                self._error_messages = ''
250               
251                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
252
253                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
254               
255               
256        def runPoll(self, feedURL):
257                '''
258                Set up the feed poll and start it running
259                @param feedURL: url to poll
260                - NB, the feedURL will typically be something like
261                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
262                '''
263                self.setupPoll()
264                self.pollFeed(feedURL)
265               
266
267        def setupPoll(self):
268                '''
269                Set up everything ready to run the polling client
270                '''
271                # NB, could get the docs via RESTful interface in feed client; currently
272                # doing this via document retrieve xmlrpc interface
273                #self.__setupFeedClient()
274                self._getPostgresDBConnection()
275                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
276                self._setupXQueries()
277       
278if __name__=="__main__":
279        print "================================="
280        print "RUNNING: FeedDocumentIngester.py"
281        ingester = FeedDocumentIngester()
282        args = ingester._setupCmdLineOptions()
283        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.