Merge branch 'master' into api_versioning

Conflicts:
	tests/test_tags.py
This commit is contained in:
sneakypete81 2013-04-28 18:38:13 +01:00
commit 1b2f0cd869
8 changed files with 76 additions and 62 deletions

View file

@ -15,9 +15,8 @@ class ApiTag:
self._client = client
def create(self, tag, **kwds):
""" Create a new tag and return it """
result = self._client.post("/tag/create.json", tag=tag, **kwds)["result"]
return Tag(self._client, result)
""" Create a new tag. The API returns true if the tag was sucessfully created """
return self._client.post("/tag/create.json", tag=tag, **kwds)["result"]
def delete(self, tag, **kwds):
"""

View file

@ -6,6 +6,10 @@ class OpenPhotoDuplicateError(OpenPhotoError):
""" Indicates that an upload operation failed due to a duplicate photo """
pass
class OpenPhoto404Error(Exception):
""" Indicates that an Http 404 error code was received (resource not found) """
pass
class NotImplementedError(OpenPhotoError):
""" Indicates that the API function has not yet been coded - please help! """
pass

View file

@ -1,3 +1,4 @@
import urllib
from errors import *
class OpenPhotoObject:
@ -122,13 +123,13 @@ class Tag(OpenPhotoObject):
Returns True if successful.
Raises an OpenPhotoError if not.
"""
result = self._openphoto.post("/tag/%s/delete.json" % self.id, **kwds)["result"]
result = self._openphoto.post("/tag/%s/delete.json" % urllib.quote(self.id), **kwds)["result"]
self._replace_fields({})
return result
def update(self, **kwds):
""" Update this tag with the specified parameters """
new_dict = self._openphoto.post("/tag/%s/update.json" % self.id,
new_dict = self._openphoto.post("/tag/%s/update.json" % urllib.quote(self.id),
**kwds)["result"]
self._replace_fields(new_dict)

View file

@ -1,7 +1,6 @@
import oauth2 as oauth
import urlparse
import urllib
import urllib2
import httplib2
import logging
try:
@ -59,7 +58,7 @@ class OpenPhotoHttp:
else:
client = httplib2.Http()
_, content = client.request(url, "GET")
response, content = client.request(url, "GET")
self._logger.info("============================")
self._logger.info("GET %s" % url)
@ -68,11 +67,10 @@ class OpenPhotoHttp:
self.last_url = url
self.last_params = params
self.last_response = content
self.last_response = (response, content)
if process_response:
return self._process_response(content)
return response
return self._process_response(response, content)
else:
return content
@ -103,28 +101,28 @@ class OpenPhotoHttp:
if files:
# Parameters must be signed and encoded into the multipart body
params = self._sign_params(client, url, params)
headers, body = encode_multipart_formdata(params, files)
request = urllib2.Request(url, body, headers)
content = urllib2.urlopen(request).read()
signed_params = self._sign_params(client, url, params)
headers, body = encode_multipart_formdata(signed_params, files)
else:
body = urllib.urlencode(params)
_, content = client.request(url, "POST", body)
headers = None
response, content = client.request(url, "POST", body, headers)
# TODO: Don't log file data in multipart forms
self._logger.info("============================")
self._logger.info("POST %s" % url)
if body:
self._logger.info(body)
self._logger.info("params: %s" % repr(params))
if files:
self._logger.info("files: %s" % repr(files))
self._logger.info("---")
self._logger.info(content)
self.last_url = url
self.last_params = params
self.last_response = content
self.last_response = (response, content)
if process_response:
return self._process_response(content)
return self._process_response(response, content)
else:
return content
@ -171,26 +169,32 @@ class OpenPhotoHttp:
return processed_params
@staticmethod
def _process_response(content):
def _process_response(response, content):
"""
Decodes the JSON response, returning a dict.
Raises an exception if an invalid response code is received.
"""
response = json.loads(content)
try:
json_response = json.loads(content)
code = json_response["code"]
message = json_response["message"]
except ValueError, KeyError:
# Response wasn't OpenPhoto JSON - check the HTTP status code
if 200 <= response.status < 300:
# Status code was valid, so just reraise the exception
raise
elif response.status == 404:
raise OpenPhoto404Error("HTTP Error %d: %s" % (response.status, response.reason))
else:
raise OpenPhotoError("HTTP Error %d: %s" % (response.status, response.reason))
if response["code"] >= 200 and response["code"] < 300:
# Valid response code
return response
error_message = "Code %d: %s" % (response["code"],
response["message"])
# Special case for a duplicate photo error
if (response["code"] == DUPLICATE_RESPONSE["code"] and
DUPLICATE_RESPONSE["message"] in response["message"]):
raise OpenPhotoDuplicateError(error_message)
raise OpenPhotoError(error_message)
if 200 <= code < 300:
return json_response
elif (code == DUPLICATE_RESPONSE["code"] and
DUPLICATE_RESPONSE["message"] in message):
raise OpenPhotoDuplicateError("Code %d: %s" % (code, message))
else:
raise OpenPhotoError("Code %d: %s" % (code, message))
@staticmethod
def _result_to_list(result):

View file

@ -9,7 +9,7 @@ A computer, Python 2.7 and an empty OpenPhoto instance.
---------------------------------------
<a name="setup"></a>
### Setting up
### Setting up
Create a tests/tokens.py file containing the following:
@ -34,7 +34,9 @@ The "-c" lets you stop the tests gracefully with \[CTRL\]-c.
The easiest way to run a subset of the tests is with nose:
cd /path/to/openphoto-python
nosetests -v -s tests/test_albums.py:TestAlbums.test_view
nosetests -v -s --nologcapture tests/test_albums.py:TestAlbums.test_view
All HTTP requests and responses are recorded in the file "tests.log".
---------------------------------------
<a name="test_details"></a>

View file

@ -7,12 +7,11 @@ try:
except ImportError:
print ("********************************************************************\n"
"You need to create a 'tokens.py' file containing the following:\n\n"
" host = \"<test_url>\"\n"
" host = \"<hostname>\"\n"
" consumer_key = \"<test_consumer_key>\"\n"
" consumer_secret = \"<test_consumer_secret>\"\n"
" token = \"<test_token>\"\n"
" token_secret = \"<test_token_secret>\"\n"
" host = \"<hostname>\"\n\n"
"WARNING: Don't use a production OpenPhoto instance for this!\n"
"********************************************************************\n")
raise
@ -29,7 +28,6 @@ class TestBase(unittest.TestCase):
unittest.TestCase.__init__(self, *args, **kwds)
self.photos = []
LOG_FILENAME = "tests.log"
logging.basicConfig(filename="tests.log",
filemode="w",
format="%(message)s",

View file

@ -25,9 +25,11 @@ class TestPhotos(test_base.TestBase):
self.client.photo.upload_encoded("tests/test_photo3.jpg",
title=self.TEST_TITLE)
# Check there are now three photos
# Check there are now three photos with the correct titles
self.photos = self.client.photos.list()
self.assertEqual(len(self.photos), 3)
for photo in self.photos:
self.assertEqual(photo.title, self.TEST_TITLE)
# Check that the upload return value was correct
pathOriginals = [photo.pathOriginal for photo in self.photos]

View file

@ -5,31 +5,33 @@ import test_base
class TestTags(test_base.TestBase):
testcase_name = "tag API"
@unittest.expectedFailure # Tag create fails - Issue #927
# NOTE: the below has not been tested/debugged, since it fails at the first step
def test_create_delete(self, tag_name="create_tag"):
def test_create_delete(self, tag_id="create_tag"):
""" Create a tag then delete it """
# Create a tag
tag = self.client.tag.create(tag_name)
self.assertTrue(self.client.tag.create(tag_id))
# Check that the tag doesn't exist (It has no photos, so it's invisible)
self.assertNotIn(tag_id, [t.id for t in self.client.tags.list()])
# Check the return value
self.assertEqual(tag.id, tag_name)
# Create a tag on one of the photos
self.photos[0].update(tagsAdd=tag_id)
# Check that the tag now exists
self.assertIn(tag_name, self.client.tags.list())
self.assertIn(tag_id, [t.id for t in self.client.tags.list()])
# Delete the tag
self.assertTrue(self.client.tag.delete(tag_name))
self.assertTrue(self.client.tag.delete(tag_id))
# Check that the tag is now gone
self.assertNotIn(tag_name, self.client.tags.list())
self.assertNotIn(tag_id, [t.id for t in self.client.tags.list()])
# Create and delete using the Tag object directly
tag = self.client.tag.create(tag_name)
# Create then delete using the Tag object directly
self.photos[0].update(tagsAdd=tag_id)
tag = [t for t in self.client.tags.list() if t.id == tag_id][0]
self.assertTrue(tag.delete())
# Check that the tag is now gone
self.assertNotIn(tag_name, self.client.tags.list())
self.assertNotIn(tag_id, [t.id for t in self.client.tags.list()])
@unittest.expectedFailure # Tag update fails - Issue #927
# NOTE: the below has not been tested/debugged, since it fails at the first step
# TODO: Un-skip and update this tests once there are tag fields that can be updated.
# The owner field cannot be updated.
@unittest.skip("Can't test the tag.update endpoint, since there are no fields that can be updated")
def test_update(self):
""" Test that a tag can be updated """
# Update the tag using the OpenPhoto class, passing in the tag object
@ -59,15 +61,17 @@ class TestTags(test_base.TestBase):
self.assertEqual(self.tags[0].owner, owner)
self.assertEqual(ret_val.owner, owner)
@unittest.expectedFailure # Tag create fails - Issue #927
# NOTE: the below has not been tested/debugged, since it fails at the first step
def test_tag_with_spaces(self):
""" Run test_create_delete using a tag containing spaces """
self.test_create_delete("tag with spaces")
# We mustn't run this test until Issue #919 is resolved,
# since it creates an undeletable tag
@unittest.skip("Tags with double-slashes cannot be deleted - Issue #919")
def test_tag_with_double_slashes(self):
def test_tag_with_slashes(self):
""" Run test_create_delete using a tag containing slashes """
self.test_create_delete("tag/with//slashes")
self.test_create_delete("tag/with/slashes")
# TODO: Un-skip this test once issue #919 is resolved -
# tags with double-slashes cannot be deleted
@unittest.expectedFailure
def test_tag_with_double_slashes(self):
""" Run test_create_delete using a tag containing double-slashes """
self.test_create_delete("tag//with//double//slashes")