source: ndgCommon/trunk/ndg/common/unittests/clients/xmldb/eXist/testatomclient.py @ 4942

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/ndgCommon/trunk/ndg/common/unittests/clients/xmldb/eXist/testatomclient.py@4942
Revision 4942, 9.7 KB checked in by cbyrom, 11 years ago (diff)

Replace old eXist connector tests with tests based on the new clients
+ extend and fix this wherever necessary.

Line 
1'''
2A test class for the AtomClient class.
3
4@author C Byrom, Tessella Feb 2009
5'''
6
7import unittest,os, time
8from ndg.common.src.clients.xmldb.eXist.atomclient import *
9import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
10import ndg.common.unittests.testconstants as tc
11from ndg.common.unittests.testutils import testUtils as tu
12from ndg.common.src.models import AtomState
13from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
14
15class TestCase(unittest.TestCase): 
16
17    def setUp(self):
18        '''
19        set up data used in the tests.
20        '''
21        self.client = AtomClient(dbHostName = tc.EXIST_DB,
22                                 configFileName = tc.EXIST_DBCONFIG_FILE, 
23                                 setUpDB = True)
24        self.createdAtoms = []    # array to store paths to atoms created - in order to then delete them
25        self.utils = tu(tc.EXIST_DBCONFIG_FILE)
26       
27        # NB, if running tests on a local db, feedparser, used by feedclient won't
28        # pick up any 'no_proxy' settings - so
29        if self.utils.host == 'localhost' and os.environ.has_key(tc.PROXY_KEY):
30            self.oldProxy = os.environ[tc.PROXY_KEY]
31            del os.environ[tc.PROXY_KEY]
32       
33
34    def testInit(self):
35        self.assertTrue(self.client)
36        self.assertTrue(self.client.client)
37
38       
39    def testValidCheckAtomSchemaCompliance(self):
40        atom = self.utils.createAtomInEXist(tc.xmlString)
41        self.createdAtoms.append(atom.getFullPath())
42        atomPath = atom.getFullPath()
43        self.assertEqual([], self.client.checkAtomSchemaCompliance(atomPath))
44       
45    def testInvalidCheckAtomSchemaCompliance(self):
46        atom = self.utils.createAtomInEXist(tc.invalidXmlString)
47        self.createdAtoms.append(atom.getFullPath())
48        atomPath = atom.getFullPath()
49        errors = self.client.checkAtomSchemaCompliance(atomPath)
50        self.assertEqual(1, len(errors))
51        self.assertTrue(errors[0].startswith(" Invalid content was found starting with element 'contributor'"))
52       
53    def testInvalidEmailCheckAtomSchemaCompliance(self):
54        atom = self.utils.createAtomInEXist(tc.invalidEmailXmlString)
55        self.createdAtoms.append(atom.getFullPath())
56        atomPath = atom.getFullPath()
57        errors = self.client.checkAtomSchemaCompliance(atomPath)
58        self.assertEqual(2, len(errors))
59        self.assertTrue(errors[1].find("of element 'email' is not valid") > -1)
60
61    def testValidCheckAtomSchemaComplianceWithKeyword(self):
62        atom = self.utils.createAtom(tc.xmlString)
63        self.assertEqual([], self.client.checkAtomSchemaCompliance('', atom = atom))
64
65    def testInValidCheckAtomSchemaCompliance(self):
66        atom = self.utils.createAtom(tc.xmlString)
67        atom.author.name = ''
68        self.assertNotEqual([], self.client.checkAtomSchemaCompliance('', atom = atom))
69
70    def testGetAtomFileCollectionPath(self):
71        atom = self.utils.createAtomInEXist(tc.xmlString)
72        self.createdAtoms.append(atom.getFullPath())
73        path = self.client._AtomClient__getAtomFileCollectionPath(atom.datasetID, atom.ME.providerID)
74        self.assertTrue(atom.getFullPath().startswith(path))
75           
76    def testGetAtomFileCollectionPathInvalidDatasetID(self):
77        atom = self.utils.createAtomInEXist(tc.xmlString)
78        self.createdAtoms.append(atom.getFullPath())
79        self.assertEquals(None, self.client._AtomClient__getAtomFileCollectionPath(tc.INVALID_FILE, atom.ME.providerID))
80           
81    def testGetAtomFileCollectionPathInvalidProviderID(self):
82        atom = self.utils.createAtomInEXist(tc.xmlString)
83        self.createdAtoms.append(atom.getFullPath())
84        self.assertEquals(None, self.client._AtomClient__getAtomFileCollectionPath(atom.datasetID, tc.INVALID_FILE))
85
86    def testGetAtomPublicationState(self):
87        atom = self.utils.createAtomInEXist(tc.xmlString)
88        self.createdAtoms.append(atom.getFullPath())
89        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
90        self.assertEqual(AtomState.WORKING_STATE.title, state.title)
91        self.assertEqual(AtomState.WORKING_STATE, state)
92
93
94    def testChangeAtomPublicationState(self):
95        atom = self.utils.createAtomInEXist(tc.xmlString)
96        self.createdAtoms.append(atom.getFullPath())
97        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
98        self.assertEqual(AtomState.WORKING_STATE.title, state.title)
99        self.client.changeAtomPublicationState(atom, AtomState.OLD_STATE)
100        state = self.client.getAtomPublicationState(atom.datasetID, atom.ME.providerID)
101        self.assertEqual(AtomState.OLD_STATE, state)
102        self.client.changeAtomPublicationState(atom, AtomState.WORKING_STATE)
103       
104       
105    def testCreateNoneAtom(self):
106        self.assertRaises(ValueError,self.client.createAtom, None)
107
108    def testCreateNotAtom(self):
109        self.assertRaises(ValueError,self.client.createAtom, [1,2,4])
110
111    def testCreateAtom(self):
112        atom = self.utils.createAtom(tc.xmlString)
113        createdAtom = self.client.createAtom(atom)
114        self.createdAtoms.append(createdAtom.getFullPath())
115        self.assertNotEquals(None, createdAtom)
116
117
118    def testSetUpEXistDB(self):
119        fileName = self.client.resources.xsd.keys()[0] + '.xsd'
120        pathName = dc.SCHEMAS_COLLECTION_PATH + '/' + fileName
121        self.client.deleteDoc(pathName)
122        self.assertEquals(True, self.client.isNewDoc(pathName))
123
124        newClient = AtomClient(dbHostName = tc.EXIST_DB,
125                               configFileName = tc.EXIST_DBCONFIG_FILE, 
126                               setUpDB = True)
127
128        self.assertEquals(False, self.client.isNewDoc(pathName))
129
130
131    def testNotSetUpEXistDB(self):
132        fileName = self.client.resources.xsd.keys()[0] + '.xsd'
133        pathName = dc.SCHEMAS_COLLECTION_PATH + fileName
134        self.client.deleteDoc(pathName)
135        self.assertEquals(True, self.client.isNewDoc(pathName))
136
137        newClient = AtomClient(dbHostName = tc.EXIST_DB,
138                               configFileName = tc.EXIST_DBCONFIG_FILE, 
139                               setUpDB = False)
140
141        self.assertEquals(True, self.client.isNewDoc(pathName))
142
143           
144    def testRunAsynchAtomPublish(self):
145        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
146        pathName = createdAtom.getFullPath()
147        self.assertEquals(False, self.client.isNewDoc(pathName))
148        createdAtom.state = AtomState.PUBLISHED_STATE
149        providerCollection = dc.PROVIDER_FEED_PATH + createdAtom.ME.providerID
150        self.createdAtoms.append(pathName)
151        feed = self.client.feedClient.getAtomFeed(providerCollection)
152       
153        if self.client.isNewCollection(providerCollection):
154            self.client.createCollections([providerCollection])
155            self.client.feedClient.createAtomFeed(providerCollection,
156                           self.client.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %createdAtom.ME.providerID)
157        self.client._AtomClient__runAsynchAtomPublish(createdAtom)
158        time.sleep(2)
159        feed2 = self.client.feedClient.getAtomFeed(providerCollection)
160        self.assertNotEqual(feed, feed2)
161
162           
163    def testPublishAtom(self):
164        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
165        createdAtom.state = AtomState.PUBLISHED_STATE
166        providerCollection = dc.PROVIDER_FEED_PATH + createdAtom.ME.providerID
167        self.createdAtoms.append(createdAtom.getFullPath())
168        feed = self.client.feedClient.getAtomFeed(providerCollection)
169       
170        if self.client.isNewCollection(providerCollection):
171            self.client.createCollections([providerCollection])
172            self.client.feedClient.createAtomFeed(providerCollection,
173                           self.client.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %createdAtom.ME.providerID)
174        self.client._AtomClient__publishAtom(createdAtom)
175        feed2 = self.client.feedClient.getAtomFeed(providerCollection)
176        self.assertNotEqual(feed, feed2)
177
178
179    def testCreateDIFDocumentFromAtom(self):
180        atom = self.utils.createAtom(tc.xmlString)
181        atom.atomTypeID = VTD.DE_TERM
182        atom = self.client.createAtom(atom)
183        self.createdAtoms.append(atom.getFullPath())
184        filePath = self.client._createDIFDocumentFromAtom(atom, dc.DIF_COLLECTION_PATH)
185        self.createdAtoms.append(filePath)
186        self.assertEquals(False, self.client.isNewDoc(filePath))
187
188
189    def testTransformAtomIntoDIF(self):
190        atom = self.utils.createAtom(tc.xmlString)
191        atom.atomTypeID = VTD.DE_TERM
192        atom = self.client.createAtom(atom)
193        self.createdAtoms.append(atom.getFullPath())
194        doc = self.client._transformAtomIntoDIF(atom)
195        self.assertNotEquals(None, doc)
196        self.assertTrue(doc.find('DIF') > -1)
197        self.assertTrue(doc.find(atom.datasetID) > -1)
198
199
200    def testNotLoadCollectionData(self):
201        self.assertFalse(self.utils.ac.atomCollections)
202
203
204    def testLoadCollectionData(self):
205        createdAtom = self.utils.createAtomInEXist(tc.xmlString)
206        filePath = createdAtom.getFullPath()
207        self.createdAtoms.append(filePath)
208        newClient = AtomClient(dbHostName = tc.EXIST_DB, 
209                               configFileName = tc.EXIST_DBCONFIG_FILE,
210                               loadCollectionData = True)
211        self.assertTrue(newClient.atomCollections)
212        self.assertEquals(filePath, newClient.atomCollections[createdAtom.datasetID] + \
213                          '/' + createdAtom.atomName)
214
215       
216    def tearDown(self):
217        for path in self.createdAtoms:
218            # delete the test file, in case it has been used in a test
219            self.utils.deleteDoc(path)
220
221           
222if __name__=="__main__":
223    unittest.main()
Note: See TracBrowser for help on using the repository browser.