Merge branch 'master' into transform

This commit is contained in:
sneakypete81 2013-04-13 13:05:14 +01:00
commit ee3840de9f
16 changed files with 212 additions and 82 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ build
dist
*.egg-info
tests/tokens.py
tests.log

View file

@ -20,10 +20,14 @@ class ApiAlbum:
return Album(self._client, result)
def delete(self, album, **kwds):
""" Delete an album """
"""
Delete an album.
Returns True if successful.
Raises an OpenPhotoError if not.
"""
if not isinstance(album, Album):
album = Album(self._client, {"id": album})
album.delete(**kwds)
return album.delete(**kwds)
def form(self, album, **kwds):
raise NotImplementedError()
@ -41,7 +45,7 @@ class ApiAlbum:
album.update(**kwds)
# Don't return the album, since the API currently doesn't give us the modified album
# Uncomment the following once frontend issue #937 is resolved
# TODO: Uncomment the following once frontend issue #937 is resolved
# return album
def view(self, album, **kwds):

View file

@ -14,25 +14,38 @@ class ApiPhotos:
return [Photo(self._client, photo) for photo in photos]
def update(self, photos, **kwds):
""" Updates a list of photos """
"""
Updates a list of photos.
Returns True if successful.
Raises OpenPhotoError if not.
"""
if not self._client.post("/photos/update.json", ids=photos, **kwds)["result"]:
raise OpenPhotoError("Update response returned False")
return True
def delete(self, photos, **kwds):
""" Deletes a list of photos """
"""
Deletes a list of photos.
Returns True if successful.
Raises OpenPhotoError if not.
"""
if not self._client.post("/photos/delete.json", ids=photos, **kwds)["result"]:
raise OpenPhotoError("Delete response returned False")
return True
class ApiPhoto:
def __init__(self, client):
self._client = client
def delete(self, photo, **kwds):
""" Delete a photo """
"""
Delete a photo.
Returns True if successful.
Raises an OpenPhotoError if not.
"""
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})
photo.delete(**kwds)
return photo.delete(**kwds)
def edit(self, photo, **kwds):
""" Returns an HTML form to edit a photo """
@ -67,7 +80,9 @@ class ApiPhoto:
return photo
def upload(self, photo_file, **kwds):
raise NotImplementedError("Use upload_encoded instead.")
result = self._client.post("/photo/upload.json", files={'photo': photo_file},
**kwds)["result"]
return Photo(self._client, result)
def upload_encoded(self, photo_file, **kwds):
""" Base64-encodes and uploads the specified file """
@ -81,8 +96,8 @@ class ApiPhoto:
def next_previous(self, photo, **kwds):
"""
Returns a dict containing the next and previous photo objects,
given a photo in the middle.
Returns a dict containing the next and previous photo lists
(there may be more than one next/previous photo returned).
"""
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})

View file

@ -20,10 +20,14 @@ class ApiTag:
return Tag(self._client, result)
def delete(self, tag, **kwds):
""" Delete a tag """
"""
Delete a tag.
Returns True if successful.
Raises an OpenPhotoError if not.
"""
if not isinstance(tag, Tag):
tag = Tag(self._client, {"id": tag})
tag.delete(**kwds)
return tag.delete(**kwds)
def update(self, tag, **kwds):
""" Update a tag """

View file

@ -47,7 +47,8 @@ def main(args=sys.argv[1:]):
if options.method == "GET":
result = client.get(options.endpoint, process_response=False, **params)
else:
result = client.post(options.endpoint, process_response=False, **params)
params, files = extract_files(params)
result = client.post(options.endpoint, process_response=False, files=files, **params)
if options.verbose:
print "==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, options.host, options.endpoint)
@ -62,5 +63,24 @@ def main(args=sys.argv[1:]):
else:
print result
def extract_files(params):
"""
Extract filenames from the "photo" parameter, so they can be uploaded, returning (updated_params, files).
Uses the same technique as openphoto-php:
* Filename can only be in the "photo" parameter
* Filename must be prefixed with "@"
* Filename must exist
...otherwise the parameter is not extracted
"""
files = {}
updated_params = {}
for name in params:
if name == "photo" and params[name].startswith("@") and os.path.isfile(os.path.expanduser(params[name][1:])):
files[name] = params[name][1:]
else:
updated_params[name] = params[name]
return updated_params, files
if __name__ == "__main__":
main()

View file

@ -0,0 +1,31 @@
import os
import mimetypes
import mimetools
def encode_multipart_formdata(params, files):
boundary = mimetools.choose_boundary()
lines = []
for name in params:
lines.append("--" + boundary)
lines.append("Content-Disposition: form-data; name=\"%s\"" % name)
lines.append("")
lines.append(str(params[name]))
for name in files:
filename = files[name]
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
lines.append("--" + boundary)
lines.append("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"" % (name, filename))
lines.append("Content-Type: %s" % content_type)
lines.append("")
lines.append(open(os.path.expanduser(filename), "rb").read())
lines.append("--" + boundary + "--")
lines.append("")
body = "\r\n".join(lines)
headers = {'Content-Type': "multipart/form-data; boundary=%s" % boundary,
'Content-Length': str(len(body))}
return headers, body

View file

@ -39,9 +39,14 @@ class OpenPhotoObject:
class Photo(OpenPhotoObject):
def delete(self, **kwds):
""" Delete this photo """
self._openphoto.post("/photo/%s/delete.json" % self.id, **kwds)
"""
Delete this photo.
Returns True if successful.
Raises an OpenPhotoError if not.
"""
result = self._openphoto.post("/photo/%s/delete.json" % self.id, **kwds)["result"]
self._replace_fields({})
return result
def edit(self, **kwds):
""" Returns an HTML form to edit the photo """
@ -74,14 +79,21 @@ class Photo(OpenPhotoObject):
raise NotImplementedError()
def next_previous(self, **kwds):
""" Returns a dict containing the next and previous photo objects """
"""
Returns a dict containing the next and previous photo lists
(there may be more than one next/previous photo returned).
"""
result = self._openphoto.get("/photo/%s/nextprevious.json" % self.id,
**kwds)["result"]
value = {}
if "next" in result:
value["next"] = Photo(self._openphoto, result["next"])
value["next"] = []
for photo in result["next"]:
value["next"].append(Photo(self._openphoto, photo))
if "previous" in result:
value["previous"] = Photo(self._openphoto, result["previous"])
value["previous"] = []
for photo in result["previous"]:
value["previous"].append(Photo(self._openphoto, photo))
return value
def transform(self, **kwds):
@ -97,9 +109,14 @@ class Photo(OpenPhotoObject):
class Tag(OpenPhotoObject):
def delete(self, **kwds):
""" Delete this tag """
self._openphoto.post("/tag/%s/delete.json" % self.id, **kwds)
"""
Delete this tag.
Returns True if successful.
Raises an OpenPhotoError if not.
"""
result = self._openphoto.post("/tag/%s/delete.json" % self.id, **kwds)["result"]
self._replace_fields({})
return result
def update(self, **kwds):
""" Update this tag with the specified parameters """
@ -125,9 +142,14 @@ class Album(OpenPhotoObject):
self.photos[i] = Photo(self._openphoto, photo)
def delete(self, **kwds):
""" Delete this album """
self._openphoto.post("/album/%s/delete.json" % self.id, **kwds)
"""
Delete this album.
Returns True if successful.
Raises an OpenPhotoError if not.
"""
result = self._openphoto.post("/album/%s/delete.json" % self.id, **kwds)["result"]
self._replace_fields({})
return result
def form(self, **kwds):
raise NotImplementedError()

View file

@ -1,7 +1,9 @@
import oauth2 as oauth
import urlparse
import urllib
import urllib2
import httplib2
import logging
try:
import json
except ImportError:
@ -9,6 +11,7 @@ except ImportError:
from objects import OpenPhotoObject
from errors import *
from multipart_post import encode_multipart_formdata
DUPLICATE_RESPONSE = {"code": 409,
"message": "This photo already exists"}
@ -23,6 +26,8 @@ class OpenPhotoHttp:
self._token = token
self._token_secret = token_secret
self._logger = logging.getLogger("openphoto")
# Remember the most recent HTTP request and response
self.last_url = None
self.last_params = None
@ -48,6 +53,11 @@ class OpenPhotoHttp:
_, content = client.request(url, "GET")
self._logger.info("============================")
self._logger.info("GET %s" % url)
self._logger.info("---")
self._logger.info(content)
self.last_url = url
self.last_params = params
self.last_response = content
@ -58,7 +68,7 @@ class OpenPhotoHttp:
else:
return content
def post(self, endpoint, process_response=True, **params):
def post(self, endpoint, process_response=True, files = {}, **params):
"""
Performs an HTTP POST to the specified endpoint (API path),
passing parameters if given.
@ -74,11 +84,26 @@ class OpenPhotoHttp:
consumer = oauth.Consumer(self._consumer_key, self._consumer_secret)
token = oauth.Token(self._token, self._token_secret)
client = oauth.Client(consumer, token)
if files:
# Parameters must be signed and encoded into the multipart body
params = self._sign_params(client, url, params)
headers, body = encode_multipart_formdata(params, files)
request = urllib2.Request(url, body, headers)
content = urllib2.urlopen(request).read()
else:
body = urllib.urlencode(params)
_, content = client.request(url, "POST", body)
# TODO: Don't log file data in multipart forms
self._logger.info("============================")
self._logger.info("POST %s" % url)
if body:
self._logger.info(body)
self._logger.info("---")
self._logger.info(content)
self.last_url = url
self.last_params = params
self.last_response = content
@ -88,6 +113,17 @@ class OpenPhotoHttp:
else:
return content
@staticmethod
def _sign_params(client, url, params):
"""Use OAuth to sign a dictionary of params"""
request = oauth.Request.from_consumer_and_token(consumer=client.consumer,
token=client.token,
http_method="POST",
http_url=url,
parameters=params)
request.sign_request(client.method, client.consumer, client.token)
return dict(urlparse.parse_qsl(request.to_postdata()))
@staticmethod
def _process_params(params):
""" Converts Unicode/lists/booleans inside HTTP parameters """
@ -144,6 +180,8 @@ class OpenPhotoHttp:
@staticmethod
def _result_to_list(result):
""" Handle the case where the result contains no items """
if not result:
return []
if result[0]["totalRows"] == 0:
return []
else:

View file

@ -13,7 +13,7 @@ A computer, Python 2.7 and an empty OpenPhoto instance.
Create a tests/tokens.py file containing the following:
# tests/token.py
# tests/tokens.py
consumer_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxx"
token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View file

@ -7,7 +7,7 @@ class TestAlbums(test_base.TestBase):
def test_create_delete(self):
""" Create an album then delete it """
album_name = "create_delete_album"
album = self.client.album.create(album_name, visible=True)
album = self.client.album.create(album_name)
# Check the return value
self.assertEqual(album.name, album_name)
@ -15,13 +15,13 @@ class TestAlbums(test_base.TestBase):
self.assertIn(album_name, [a.name for a in self.client.albums.list()])
# Delete the album
self.client.album.delete(album.id)
self.assertTrue(self.client.album.delete(album.id))
# Check that the album is now gone
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
# Create it again, and delete it using the Album object
album = self.client.album.create(album_name, visible=True)
album.delete()
album = self.client.album.create(album_name)
self.assertTrue(album.delete())
# Check that the album is now gone
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
@ -56,23 +56,11 @@ class TestAlbums(test_base.TestBase):
self.assertFalse(hasattr(album, "photos"))
# Get the photos in the album using the Album object directly
album.view()
album.view(includeElements=True)
# Make sure all photos are in the album
for photo in self.photos:
self.assertIn(photo.id, [p.id for p in album.photos])
@unittest.expectedFailure # Private albums are not visible - issue #929
def test_private(self):
""" Test that private albums can be created, and are visible """
# Create and check that the album now exists
album_name = "private_album"
album = self.client.album.create(album_name, visible=False)
self.assertIn(album_name, [a.name for a in self.client.albums.list()])
# Delete and check that the album is now gone
album.delete()
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
def test_form(self):
""" If album.form gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):

View file

@ -1,4 +1,5 @@
import unittest
import logging
import openphoto
try:
@ -26,6 +27,12 @@ class TestBase(unittest.TestCase):
unittest.TestCase.__init__(self, *args, **kwds)
self.photos = []
LOG_FILENAME = "tests.log"
logging.basicConfig(filename="tests.log",
filemode="w",
format="%(message)s",
level=logging.INFO)
@classmethod
def setUpClass(cls):
""" Ensure there is nothing on the server before running any tests """
@ -61,6 +68,7 @@ class TestBase(unittest.TestCase):
"""
self.photos = self.client.photos.list()
if len(self.photos) != 3:
# print self.photos
print "[Regenerating Photos]"
if len(self.photos) > 0:
self._delete_all()
@ -70,7 +78,7 @@ class TestBase(unittest.TestCase):
self.tags = self.client.tags.list()
if (len(self.tags) != 1 or
self.tags[0].id != self.TEST_TAG or
self.tags[0].count != "3"):
self.tags[0].count != 3):
print "[Regenerating Tags]"
self._delete_all()
self._create_test_photos()
@ -94,25 +102,29 @@ class TestBase(unittest.TestCase):
print "Albums: %s" % self.albums
raise Exception("Album creation failed")
logging.info("\nRunning %s..." % self.id())
def tearDown(self):
logging.info("Finished %s\n" % self.id())
@classmethod
def _create_test_photos(cls):
""" Upload three test photos """
album = cls.client.album.create(cls.TEST_ALBUM, visible=True)
album = cls.client.album.create(cls.TEST_ALBUM)
photos = [
cls.client.photo.upload_encoded("tests/test_photo1.jpg",
cls.client.photo.upload("tests/test_photo1.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
cls.client.photo.upload_encoded("tests/test_photo2.jpg",
albums=album.id),
cls.client.photo.upload("tests/test_photo2.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
cls.client.photo.upload_encoded("tests/test_photo3.jpg",
albums=album.id),
cls.client.photo.upload("tests/test_photo3.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
albums=album.id),
]
# Remove the auto-generated month/year tags
tags_to_remove = [p for p in photos[0].tags if p != cls.TEST_TAG]
# Add the test tag, removing any autogenerated tags
for photo in photos:
photo.update(tagsRemove=tags_to_remove, albums=album.id)
photo.update(tags=cls.TEST_TAG)
@classmethod
def _delete_all(cls):

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.4 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 854 B

After

Width:  |  Height:  |  Size: 910 B

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 579 B

After

Width:  |  Height:  |  Size: 635 B

Before After
Before After

View file

@ -6,19 +6,19 @@ class TestPhotos(test_base.TestBase):
def test_delete_upload(self):
""" Test photo deletion and upload """
# Delete one photo using the OpenPhoto class, passing in the id
self.client.photo.delete(self.photos[0].id)
self.assertTrue(self.client.photo.delete(self.photos[0].id))
# Delete one photo using the OpenPhoto class, passing in the object
self.client.photo.delete(self.photos[1])
self.assertTrue(self.client.photo.delete(self.photos[1]))
# And another using the Photo object directly
self.photos[2].delete()
self.assertTrue(self.photos[2].delete())
# Check that they're gone
self.assertEqual(self.client.photos.list(), [])
# Re-upload the photos
ret_val = self.client.photo.upload_encoded("tests/test_photo1.jpg",
# Re-upload the photos, one of them using Bas64 encoding
ret_val = self.client.photo.upload("tests/test_photo1.jpg",
title=self.TEST_TITLE)
self.client.photo.upload_encoded("tests/test_photo2.jpg",
self.client.photo.upload("tests/test_photo2.jpg",
title=self.TEST_TITLE)
self.client.photo.upload_encoded("tests/test_photo3.jpg",
title=self.TEST_TITLE)
@ -32,7 +32,7 @@ class TestPhotos(test_base.TestBase):
self.assertIn(ret_val.pathOriginal, pathOriginals)
# Delete all photos in one go
self.client.photos.delete(self.photos)
self.assertTrue(self.client.photos.delete(self.photos))
# Check they're gone
self.photos = self.client.photos.list()
@ -56,7 +56,7 @@ class TestPhotos(test_base.TestBase):
""" Ensure that duplicate photos are rejected """
# Attempt to upload a duplicate
with self.assertRaises(openphoto.OpenPhotoDuplicateError):
self.client.photo.upload_encoded("tests/test_photo1.jpg",
self.client.photo.upload("tests/test_photo1.jpg",
title=self.TEST_TITLE)
# Check there are still three photos
@ -123,13 +123,13 @@ class TestPhotos(test_base.TestBase):
def test_next_previous(self):
""" Test the next/previous links of the middle photo """
next_prev = self.client.photo.next_previous(self.photos[1])
self.assertEqual(next_prev["previous"].id, self.photos[0].id)
self.assertEqual(next_prev["next"].id, self.photos[2].id)
self.assertEqual(next_prev["previous"][0].id, self.photos[0].id)
self.assertEqual(next_prev["next"][0].id, self.photos[2].id)
# Do the same using the Photo object directly
next_prev = self.photos[1].next_previous()
self.assertEqual(next_prev["previous"].id, self.photos[0].id)
self.assertEqual(next_prev["next"].id, self.photos[2].id)
self.assertEqual(next_prev["previous"][0].id, self.photos[0].id)
self.assertEqual(next_prev["next"][0].id, self.photos[2].id)
def test_replace(self):
""" If photo.replace gets implemented, write a test! """
@ -141,11 +141,6 @@ class TestPhotos(test_base.TestBase):
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.replace_encoded(None, None)
def test_upload(self):
""" If photo.upload gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.upload(None)
def test_dynamic_url(self):
""" If photo.dynamic_url gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):

View file

@ -16,13 +16,13 @@ class TestTags(test_base.TestBase):
self.assertIn(tag_name, self.client.tags.list())
# Delete the tag
self.client.tag.delete(tag_name)
self.assertTrue(self.client.tag.delete(tag_name))
# Check that the tag is now gone
self.assertNotIn(tag_name, self.client.tags.list())
# Create and delete using the Tag object directly
tag = self.client.tag.create(tag_name)
tag.delete()
self.assertTrue(tag.delete())
# Check that the tag is now gone
self.assertNotIn(tag_name, self.client.tags.list())