source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 5584

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@5584
Revision 5584, 10.4 KB checked in by sdonegan, 11 years ago (diff)

Updated feed handling to use new reporting methods as developed for conventional ingestion

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
9import ndg.common.src.lib.fileutilities as FileUtilities
10from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
11from abstractdocumentingester import AbstractDocumentIngester
12from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.models.ndgObject import ndgObject
15from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
16import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
17
18class FeedDocumentIngester(AbstractDocumentIngester):
19        '''
20        Class to handle the ingest of files from an eXist feed
21        '''
22        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
23       
24        # interval with which to check the feed for updates
25        pollInterval = 300.0
26       
27        # published time from which feed entries will be ingested
28        ingestFromDate = datetime.datetime.now()
29
30        # temp dir to create docs for running transforms on
31        TEMP_DIR_NAME = "feedPollTempDir"
32
33        def __getDBConnection(self, dBHostname, configFileName):
34                '''
35                Get the default DB connection - by reading in data from the db config file
36                - to eXist DB to get feed docs from
37                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
38                @param configFileName: name of config file to get DB details from
39                '''
40                logging.info("Setting up connection to eXist DB")
41                self.dbClient = SearchClient(dbHostName = dBHostname,
42                                                                         configFileName = configFileName)
43                logging.info("Connection to eXist now set up")
44
45       
46        def setPollInterval(self, interval):
47                '''
48                Set the polling time interval
49                @param interval: an int or float representing the time between checking
50                the atom feed for updates
51                '''
52                if isinstance(interval, str) or isinstance(interval, unicode):
53                        interval = float(interval)
54                       
55                if isinstance(interval, int) or isinstance(interval, float):
56                        self.pollInterval = interval
57                else:
58                        raise ValueError(self.INTERVAL_ERROR %str(interval))
59
60       
61        def setIngestFromDate(self, ingestFromDate):
62                '''
63                Set the published date from which feed entries should be ingested
64                @param ingestFromDate: date from which documents should be ingested - in
65                format YYYY-MM-DD
66                '''
67                ingestDate = formatDateYYYYMMDD(ingestFromDate)
68                if ingestDate:
69                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
70                                                                                                                         YEAR_FORMAT)
71
72
73        def ingestDIFDocument(self, browseURL):
74                '''
75                Retrieve the DIF specified at the browseURL, then ingest
76                the document into postgres
77        @param browseURL: url to retrieve doc from
78                '''
79                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
80       
81                #if not self.feedClient:
82                #       self.__setupFeedClient()
83
84                # get the required info out of the URL
85                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
86                no = ndgObject(ndgURI)
87               
88                # set up things for the provider of the data
89                dataCentre = no.repository.split('.')[0]
90               
91                # check if we're to ingest data from this data centre
92                if self.dataCentrePoll:
93                        if dataCentre != self.dataCentrePoll:
94                                logging.info("DIF doc is from a different data centre ('%s') to \
95                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
96                                return
97               
98                # only bother reloading the data centre info, if we're really interested
99                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
100                        self.getConfigDetails(dataCentre)
101               
102                self.dataCentre = dataCentre
103                self._setupDataCentreDirs()
104               
105                doc = self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
106                                      targetCollection=dc.DIF_COLLECTION_PATH)
107
108                # create a temporary file to allow the ingest to do the various
109                # xquery transforms on
110                difFilename = self.originals_dir + no.localID + '.dif'
111                FileUtilities.createFile(difFilename, doc)
112
113                numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre)
114               
115                #for feed ingester, just log report OP
116                logging.info("----------------------------------------------------------------")
117                logging.info(processReport)       
118                logging.info("Document fully ingested into discovery service")
119                logging.info("----------------------------------------------------------------")
120
121
122        def __setupFeedClient(self):
123                '''
124                Set up a feed client - to allow use of RESTful interface to retrieve
125                atoms referenced in the feed
126                '''
127                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
128                                                                         eXistPortNo = self.eXistPortNo)
129
130        def processFeed(self, feedUrl):
131                '''
132                Poll the atom feed and ingest any new data found
133                NB, this is an infinite loop to allow permanent polling
134                @param feedUrl: url to the feed to poll
135                '''
136                # NB, only check the feed every five minutes
137                if time.time() > self.refreshTime:
138                        logging.info("- polling for new data...")
139                        feed = feedparser.parse(feedUrl)
140                        # check for new entries
141                        newEntryNumber = len(feed.entries)
142                        if  newEntryNumber > self.entryNumber:
143                                logging.info("New data found - checking if this has been published after specified ingest date...")
144                               
145                                for i in range(self.entryNumber, newEntryNumber):
146                                        if self.__isEntryIngestible(feed.entries[i]):
147                                                browseURL = feed.entries[i].content[0].src
148                                                self.ingestDIFDocument(browseURL) 
149                       
150                                self.entryNumber = len(feed.entries)
151
152                        self.refreshTime = time.time() + self.pollInterval
153                        logging.info("- finished polling for new data")
154
155       
156        def pollFeed(self, feedUrl):
157                '''
158                Poll the atom feed and ingest any new data found
159                NB, this is an infinite loop to allow permanent polling
160                @param feedUrl: url to the feed to poll
161                '''
162                self.refreshTime = 0.
163                self.entryNumber = 0
164
165                logging.info("Start polling for atom feed updates")
166                while (1 == 1):
167                        self.processFeed(feedUrl)
168
169
170        def __isEntryIngestible(self, entry):
171                '''
172                Check if a feed entry is valid for ingestion, return True, if so
173                - to be ingested, the entry must be later than the specified
174                'ingest from' date and must represent a DIF record
175                @param entry: feedparser.entry from a feed to check
176                @return True, if entry should be ingested, false otherwise
177                '''
178                # firstly, check the entry is a DIF record
179                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
180                        logging.info("- entry represents a DIF record - now check its date")
181                                                       
182                        # now check the entry is published after the specified ingest date
183                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
184                        if self.ingestFromDate < entryDate:
185                                logging.info("- data is new - so valid for ingesting")
186                                return True
187                        else:
188                                logging.info("- data is before the ingest time period")
189                else:
190                        logging.info("- entry doesn't represent a DIF record")
191                logging.info("- data is not valid for ingesting")
192                return False
193
194
195        def usage(self):
196                '''
197                Display input params for the script
198                '''
199                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
200                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
201                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
202                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
203                print " -v - verbose mode for output logging"
204                print " -d - debug mode for output logging"
205                print " \nand keywords are:"
206                print " interval - interval, in seconds, at which to retrieve data from the feed"
207                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
208                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
209                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
210                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
211                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
212                sys.exit(2)
213
214
215        def __init__(self, interval = None, ingestFromDate = None, 
216                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
217                                 eXistPortNo = '8080',
218                                 configFileName = 'passwords.txt', 
219                                 dataCentrePoll = None):
220                '''
221                Set up object
222                @keyword interval: polling interval, in seconds, to set the feed polling to
223                - default is 300s
224                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
225                any feed entries before this date - in format, YYYY-MM-DD
226        @keyword eXistDBHostname: name of eXist DB to use
227        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
228        @keyword configFileName : password file to use - NB, this should
229        have contents of format (NB, space delimiter):
230        dbName userID password
231        Default is 'passwords.txt'.
232        to '8080' if not set
233        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
234        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
235                '''
236                self.dataCentre = None
237               
238                self.dataCentrePoll = dataCentrePoll
239               
240                if interval:
241                        self.setPollInterval(interval)
242                       
243                if ingestFromDate:
244                        self.setIngestFrom(ingestFromDate)
245               
246                self.eXistDBHostname = eXistDBHostname
247                self.eXistPortNo = eXistPortNo
248                self.configFileName = configFileName
249                self.refreshTime = 0.
250                self.entryNumber = 0
251                self._no_files_ingested = 0
252                self._no_problem_files = 0
253                self._error_messages = ''
254               
255                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
256
257                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
258               
259               
260        def runPoll(self, feedURL):
261                '''
262                Set up the feed poll and start it running
263                @param feedURL: url to poll
264                - NB, the feedURL will typically be something like
265                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
266                '''
267                self.setupPoll()
268                self.pollFeed(feedURL)
269               
270
271        def setupPoll(self):
272                '''
273                Set up everything ready to run the polling client
274                '''
275                # NB, could get the docs via RESTful interface in feed client; currently
276                # doing this via document retrieve xmlrpc interface
277                #self.__setupFeedClient()
278                self._getPostgresDBConnection()
279                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
280                self._setupXQueries()
281       
282if __name__=="__main__":
283        print "================================="
284        print "RUNNING: FeedDocumentIngester.py"
285        ingester = FeedDocumentIngester()
286        args = ingester._setupCmdLineOptions()
287        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.