source: ndgCommon/trunk/ndg/common/unittests/clients/xmldb/eXist/testatomclient.py @ 4978

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/ndgCommon/trunk/ndg/common/unittests/clients/xmldb/eXist/testatomclient.py@4978
Revision 4978, 9.8 KB checked in by cbyrom, 12 years ago (diff)

Add new test suite for RESTful client + update various other test classes to cope with changes to client + proxy uses.

Line 
1'''
2A test class for the AtomClient class.
3
4@author C Byrom, Tessella Feb 2009
5'''
6
7import unittest,os, time
8from ndg.common.src.clients.xmldb.eXist.atomclient import *
9import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
10import ndg.common.unittests.testconstants as tc
11from ndg.common.unittests.testutils import testUtils as tu
12from ndg.common.src.models import AtomState
13from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
14import ndg.common.src.lib.utilities as utils
15
16class TestCase(unittest.TestCase): 
17
18    def setUp(self):
19        '''
20        set up data used in the tests.
21        '''
22        self.client = AtomClient(dbHostName = tc.EXIST_DB,
23                                 configFileName = tc.EXIST_DBCONFIG_FILE, 
24                                 setUpDB = True)
25        self.createdAtoms = []    # array to store paths to atoms created - in order to then delete them
26        self.utils = tu(tc.EXIST_DBCONFIG_FILE)
27       
28        # NB, if running tests on a local db, feedparser, used by feedclient won't
29        # pick up any 'no_proxy' settings - so
30        if self.utils.host == 'localhost' and os.environ.has_key(utils.PROXY_KEY):
31            self.oldProxy = os.environ[utils.PROXY_KEY]
32            del os.environ[utils.PROXY_KEY]
33       
34
35    def testInit(self):
36        self.assertTrue(self.client)
37        self.assertTrue(self.client.client)
38
39       
40    def testValidCheckAtomSchemaCompliance(self):
41        atom = self.utils.createAtomInEXist(tc.xmlString)
42        self.createdAtoms.append(atom.getFullPath())
43        atomPath = atom.getFullPath()
44        self.assertEqual([], self.client.checkAtomSchemaCompliance(atomPath))
45       
46    def testInvalidCheckAtomSchemaCompliance(self):
47        atom = self.utils.createAtomInEXist(tc.invalidXmlString)
48        self.createdAtoms.append(atom.getFullPath())
49        atomPath = atom.getFullPath()
50        errors = self.client.checkAtomSchemaCompliance(atomPath)
51        self.assertEqual(1, len(errors))
52        self.assertTrue(errors[0].startswith(" Invalid content was found starting with element 'contributor'"))
53       
54    def testInvalidEmailCheckAtomSchemaCompliance(self):
55        atom = self.utils.createAtomInEXist(tc.invalidEmailXmlString)
56        self.createdAtoms.append(atom.getFullPath())
57        atomPath = atom.getFullPath()
58        errors = self.client.checkAtomSchemaCompliance(atomPath)
59        self.assertEqual(2, len(errors))
60        self.assertTrue(errors[1].find("of element 'email' is not valid") > -1)
61
62    def testValidCheckAtomSchemaComplianceWithKeyword(self):
63        atom = self.utils.createAtom(tc.xmlString)
64        self.assertEqual([], self.client.checkAtomSchemaCompliance('', atom = atom))
65
66    def testInValidCheckAtomSchemaCompliance(self):
67        atom = self.utils.createAtom(tc.xmlString)
68        atom.author.name = ''
69        self.assertNotEqual([], self.client.checkAtomSchemaCompliance('', atom = atom))
70
71    def testGetAtomFileCollectionPath(self):
72        atom = self.utils.createAtomInEXist(tc.xmlString)
73        self.createdAtoms.append(atom.getFullPath())
74        path = self.client._AtomClient__getAtomFileCollectionPath(atom.datasetID, atom.ME.providerID)
75        self.assertTrue(atom.getFullPath().startswith(path))
76           
77    def testGetAtomFileCollectionPathInvalidDatasetID(self):
78        atom = self.utils.createAtomInEXist(tc.xmlString)
79        self.createdAtoms.append(atom.getFullPath())
80        self.assertEquals(None, self.client._AtomClient__getAtomFileCollectionPath(tc.INVALID_FILE, atom.ME.providerID))
81           
82    def testGetAtomFileCollectionPathInvalidProviderID(self):
83        atom = self.utils.createAtomInEXist(tc.xmlString)
84        self.createdAtoms.append(atom.getFullPath())
85        self.assertEquals(None, self.client._AtomClient__getAtomFileCollectionPath(atom.datasetID, tc.INVALID_FILE))
86
87    def testGetAtomPublicationState(self):
88        atom = self.utils.createAtomInEXist(tc.xmlString)
89        self.createdAtoms.append(atom.getFullPath())
90        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
91        self.assertEqual(AtomState.WORKING_STATE.title, state.title)
92        self.assertEqual(AtomState.WORKING_STATE, state)
93
94
95    def testChangeAtomPublicationState(self):
96        atom = self.utils.createAtomInEXist(tc.xmlString)
97        self.createdAtoms.append(atom.getFullPath())
98        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
99        self.assertEqual(AtomState.WORKING_STATE.title, state.title)
100        self.client.changeAtomPublicationState(atom, AtomState.OLD_STATE)
101        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
102        self.assertEqual(AtomState.OLD_STATE, state)
103        self.client.changeAtomPublicationState(atom, AtomState.WORKING_STATE)
104       
105       
106    def testCreateNoneAtom(self):
107        self.assertRaises(ValueError,self.client.createAtom, None)
108
109    def testCreateNotAtom(self):
110        self.assertRaises(ValueError,self.client.createAtom, [1,2,4])
111
112    def testCreateAtom(self):
113        atom = self.utils.createAtom(tc.xmlString)
114        createdAtom = self.client.createAtom(atom)
115        self.createdAtoms.append(createdAtom.getFullPath())
116        self.assertNotEquals(None, createdAtom)
117
118
119    def testSetUpEXistDB(self):
120        fileName = self.client.resources.xsd.keys()[0] + '.xsd'
121        pathName = dc.SCHEMAS_COLLECTION_PATH + '/' + fileName
122        self.client.deleteDoc(pathName)
123        self.assertEquals(True, self.client.isNewDoc(pathName))
124
125        newClient = AtomClient(dbHostName = tc.EXIST_DB,
126                               configFileName = tc.EXIST_DBCONFIG_FILE, 
127                               setUpDB = True)
128
129        self.assertEquals(False, self.client.isNewDoc(pathName))
130
131
132    def testNotSetUpEXistDB(self):
133        fileName = self.client.resources.xsd.keys()[0] + '.xsd'
134        pathName = dc.SCHEMAS_COLLECTION_PATH + fileName
135        self.client.deleteDoc(pathName)
136        self.assertEquals(True, self.client.isNewDoc(pathName))
137
138        newClient = AtomClient(dbHostName = tc.EXIST_DB,
139                               configFileName = tc.EXIST_DBCONFIG_FILE, 
140                               setUpDB = False)
141
142        self.assertEquals(True, self.client.isNewDoc(pathName))
143
144           
145    def testRunAsynchAtomPublish(self):
146        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
147        pathName = createdAtom.getFullPath()
148        self.assertEquals(False, self.client.isNewDoc(pathName))
149        createdAtom.state = AtomState.PUBLISHED_STATE
150        providerCollection = dc.PROVIDER_FEED_PATH + createdAtom.ME.providerID
151        self.createdAtoms.append(pathName)
152        feed = self.client.feedClient.getAtomFeed(providerCollection)
153       
154        if self.client.isNewCollection(providerCollection):
155            self.client.createCollections([providerCollection])
156            self.client.feedClient.createAtomFeed(providerCollection,
157                           self.client.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %createdAtom.ME.providerID)
158        self.client._AtomClient__runAsynchAtomPublish(createdAtom)
159        time.sleep(2)
160        feed2 = self.client.feedClient.getAtomFeed(providerCollection)
161        self.assertNotEqual(feed, feed2)
162
163           
164    def testPublishAtom(self):
165        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
166        createdAtom.state = AtomState.PUBLISHED_STATE
167        providerCollection = dc.PROVIDER_FEED_PATH + createdAtom.ME.providerID
168        self.createdAtoms.append(createdAtom.getFullPath())
169        feed = self.client.feedClient.getAtomFeed(providerCollection)
170       
171        if self.client.isNewCollection(providerCollection):
172            self.client.createCollections([providerCollection])
173            self.client.feedClient.createAtomFeed(providerCollection,
174                           self.client.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %createdAtom.ME.providerID)
175        self.client._AtomClient__publishAtom(createdAtom)
176        feed2 = self.client.feedClient.getAtomFeed(providerCollection)
177        self.assertNotEqual(feed, feed2)
178
179
180    def testCreateDIFDocumentFromAtom(self):
181        atom = self.utils.createAtom(tc.xmlString)
182        atom.atomTypeID = VTD.DE_TERM
183        atom = self.client.createAtom(atom)
184        self.createdAtoms.append(atom.getFullPath())
185        filePath = self.client._createDIFDocumentFromAtom(atom, dc.DIF_COLLECTION_PATH)
186        self.createdAtoms.append(filePath)
187        self.assertEquals(False, self.client.isNewDoc(filePath))
188
189
190    def testTransformAtomIntoDIF(self):
191        atom = self.utils.createAtom(tc.xmlString)
192        atom.atomTypeID = VTD.DE_TERM
193        atom = self.client.createAtom(atom)
194        self.createdAtoms.append(atom.getFullPath())
195        doc = self.client._transformAtomIntoDIF(atom)
196        self.assertNotEquals(None, doc)
197        self.assertTrue(doc.find('DIF') > -1)
198        self.assertTrue(doc.find(atom.datasetID) > -1)
199
200
201    def testNotLoadCollectionData(self):
202        self.assertFalse(self.utils.ac.atomCollections)
203
204
205    def testLoadCollectionData(self):
206        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
207        filePath = createdAtom.getFullPath()
208        self.createdAtoms.append(filePath)
209        newClient = AtomClient(dbHostName = tc.EXIST_DB, 
210                               configFileName = tc.EXIST_DBCONFIG_FILE,
211                               loadCollectionData = True)
212        self.assertTrue(newClient.atomCollections)
213        self.assertEquals(filePath, newClient.atomCollections[createdAtom.datasetID] + \
214                          '/' + createdAtom.atomName)
215
216       
217    def tearDown(self):
218        for path in self.createdAtoms:
219            # delete the test file, in case it has been used in a test
220            self.utils.deleteDoc(path)
221        if self.oldProxy:
222            os.environ[utils.PROXY_KEY] = self.oldProxy
223
224           
225if __name__=="__main__":
226    unittest.main()
Note: See TracBrowser for help on using the repository browser.