source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 4854

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@4854
Revision 4854, 9.9 KB checked in by cbyrom, 11 years ago (diff)

Add new ingest script - to allow ingest of DIF docs from eXist hosted
atom feed. NB, this required restructure of original OAI harvester
to allow re-use of shared code - by abstracting this out into new class,
absstractdocumentingester.

Add new documentation and tidy up codebase removing dependencies where possible to simplify things.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
9import ndg.common.src.lib.fileutilities as FileUtilities
10from ndg.common.src.clients.xmldb.eXist.existdbfeedclient import eXistDBFeedClient as feedClient
11from abstractdocumentingester import AbstractDocumentIngester
12from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.models.ndgObject import ndgObject as ndgObject
15from ndg.common.src.dal.DocumentRetrieve import DocumentRetrieve as DR
16from ndg.common.src.clients.xmldb.eXist.eXistConnector import eXistConnector as ec
17
18class FeedDocumentIngester(AbstractDocumentIngester):
19        '''
20        Class to handle the ingest of files from an eXist feed
21        '''
22        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
23       
24        # interval with which to check the feed for updates
25        pollInterval = 300.0
26       
27        # published time from which feed entries will be ingested
28        ingestFromDate = datetime.datetime.now()
29
30        # temp dir to create docs for running transforms on
31        TEMP_DIR_NAME = "feedPollTempDir"
32
33        def __getEXistDBConnection(self, eXistDBHostname='chinook.badc.rl.ac.uk'):
34                '''
35                Get the default DB connection - by reading in data from the db config file
36                - to eXist DB to get feed docs from
37                @keyword eXistDBHostname: the hostname of the eXist DB to use for the ingest
38                - defaults to chinook
39                '''
40                logging.info("Setting up connection to eXist DB")
41                self.dr = DR(eXistDBHostname)
42                logging.info("Connection to eXist now set up")
43
44       
45        def setPollInterval(self, interval):
46                '''
47                Set the polling time interval
48                @param interval: an int or float representing the time between checking
49                the atom feed for updates
50                '''
51                if isinstance(interval, str) or isinstance(interval, unicode):
52                        interval = float(interval)
53                       
54                if isinstance(interval, int) or isinstance(interval, float):
55                        self.pollInterval = interval
56                else:
57                        raise ValueError(self.INTERVAL_ERROR %str(interval))
58
59       
60        def setIngestFromDate(self, ingestFromDate):
61                '''
62                Set the published date from which feed entries should be ingested
63                @param ingestFromDate: date from which documents should be ingested - in
64                format YYYY-MM-DD
65                '''
66                ingestDate = formatDateYYYYMMDD(ingestFromDate)
67                if ingestDate:
68                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
69                                                                                                                         YEAR_FORMAT)
70
71
72        def ingestDIFDocument(self, browseURL):
73                '''
74                Retrieve the DIF specified at the browseURL, then ingest
75                the document into postgres
76        @param browseURL: url to retrieve doc from
77                '''
78                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
79       
80                #if not self.feedClient:
81                #       self.__setupFeedClient()
82
83                # get the required info out of the URL
84                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
85                no = ndgObject(ndgURI)
86               
87                # set up things for the provider of the data
88                dataCentre = no.repository.split('.')[0]
89               
90                # check if we're to ingest data from this data centre
91                if self.dataCentrePoll:
92                        if dataCentre != self.dataCentrePoll:
93                                logging.info("DIF doc is from a different data centre ('%s') to \
94                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
95                                return
96               
97                # only bother reloading the data centre info, if we're really interested
98                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
99                        self.getConfigDetails(dataCentre)
100               
101                self.dataCentre = dataCentre
102                self._setupDataCentreDirs()
103               
104                docData = self.dr.get(no.repository,no.schema,no.localID,
105                                                          targetCollection=ec.DIF_COLLECTION_PATH,
106                                                          includeDocNameData=True)
107
108                difFilename, contents = docData.items()[0]
109                # create a temporary file to allow the ingest to do the various
110                # xquery transforms on
111                difFilename = self.originals_dir + difFilename
112                FileUtilities.createFile(difFilename, contents)
113
114                self._convertAndIngestFiles(self.originals_dir, self.discovery_dir)
115       
116                logging.info("Document fully ingested into discovery service")
117
118
119        def __setupFeedClient(self):
120                '''
121                Set up a feed client - to allow use of RESTful interface to retrieve
122                atoms referenced in the feed
123                '''
124                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
125                                                                         eXistPortNo = self.eXistPortNo)
126
127        def processFeed(self, feedUrl):
128                '''
129                Poll the atom feed and ingest any new data found
130                NB, this is an infinite loop to allow permanent polling
131                @param feedUrl: url to the feed to poll
132                '''
133                # NB, only check the feed every five minutes
134                if time.time() > self.refreshTime:
135                        logging.info("- polling for new data...")
136                        feed = feedparser.parse(feedUrl)
137                        # check for new entries
138                        newEntryNumber = len(feed.entries)
139                        if  newEntryNumber > self.entryNumber:
140                                logging.info("New data found - checking if this has been published after specified ingest date...")
141                               
142                                for i in range(self.entryNumber, newEntryNumber):
143                                        if self.__isEntryIngestible(feed.entries[i]):
144                                                browseURL = feed.entries[i].content[0].src
145                                                self.ingestDIFDocument(browseURL) 
146                       
147                                self.entryNumber = len(feed.entries)
148
149                        self.refreshTime = time.time() + self.pollInterval
150                        logging.info("- finished polling for new data")
151
152       
153        def pollFeed(self, feedUrl):
154                '''
155                Poll the atom feed and ingest any new data found
156                NB, this is an infinite loop to allow permanent polling
157                @param feedUrl: url to the feed to poll
158                '''
159                self.refreshTime = 0.
160                self.entryNumber = 0
161
162                logging.info("Start polling for atom feed updates")
163                while (1 == 1):
164                        self.processFeed(feedUrl)
165
166
167        def __isEntryIngestible(self, entry):
168                '''
169                Check if a feed entry is valid for ingestion, return True, if so
170                - to be ingested, the entry must be later than the specified
171                'ingest from' date and must represent a DIF record
172                @param entry: feedparser.entry from a feed to check
173                @return True, if entry should be ingested, false otherwise
174                '''
175                # firstly, check the entry is a DIF record
176                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
177                        logging.info("- entry represents a DIF record - now check its date")
178                                                       
179                        # now check the entry is published after the specified ingest date
180                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
181                        if self.ingestFromDate < entryDate:
182                                logging.info("- data is new - so valid for ingesting")
183                                return True
184                        else:
185                                logging.info("- data is before the ingest time period")
186                else:
187                        logging.info("- entry doesn't represent a DIF record")
188                logging.info("- data is not valid for ingesting")
189                return False
190
191
192        def usage(self):
193                '''
194                Display input params for the script
195                '''
196                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
197                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
198                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
199                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
200                print " -v - verbose mode for output logging"
201                print " -d - debug mode for output logging"
202                print " \nand keywords are:"
203                print " interval - interval, in seconds, at which to retrieve data from the feed"
204                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
205                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
206                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
207                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
208                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
209                sys.exit(2)
210
211
212        def __init__(self, interval = None, ingestFromDate = None, \
213                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', eXistPortNo = '8080',
214                                 dataCentrePoll = None):
215                '''
216                Set up object
217                @keyword interval: polling interval, in seconds, to set the feed polling to
218                - default is 300s
219                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
220                any feed entries before this date - in format, YYYY-MM-DD
221        @keyword eXistDBHostname: name of eXist DB to use
222        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
223        to '8080' if not set
224        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
225        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
226                '''
227                self.dataCentre = None
228               
229                self.dataCentrePoll = dataCentrePoll
230               
231                if interval:
232                        self.setPollInterval(interval)
233                       
234                if ingestFromDate:
235                        self.setIngestFrom(ingestFromDate)
236               
237                self.eXistDBHostname = eXistDBHostname
238                self.eXistPortNo = eXistPortNo
239                self.refreshTime = 0.
240                self.entryNumber = 0
241                self._no_files_ingested = 0
242                self._no_problem_files = 0
243               
244                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
245
246                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
247               
248               
249        def runPoll(self, feedURL):
250                '''
251                Set up the feed poll and start it running
252                @param feedURL: url to poll
253                - NB, the feedURL will typically be something like
254                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
255                '''
256                self.setupPoll()
257                self.pollFeed(feedURL)
258               
259
260        def setupPoll(self):
261                '''
262                Set up everything ready to run the polling client
263                '''
264                # NB, could get the docs via RESTful interface in feed client; currently
265                # doing this via document retrieve xmlrpc interface
266                #self.__setupFeedClient()
267                self._getPostgresDBConnection()
268                self.__getEXistDBConnection(eXistDBHostname = self.eXistDBHostname)
269                self._setupXQueries()
270       
271if __name__=="__main__":
272        print "================================="
273        print "RUNNING: FeedDocumentIngester.py"
274        ingester = FeedDocumentIngester()
275        args = ingester._setupCmdLineOptions()
276        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.