Merge branch 'master' into logging

Conflicts:
	openphoto/openphoto_http.py
This commit is contained in:
sneakypete81 2013-04-02 21:10:42 +01:00
commit 95b58a0e3a
13 changed files with 122 additions and 58 deletions

View file

@ -41,7 +41,7 @@ class ApiAlbum:
album.update(**kwds)
# Don't return the album, since the API currently doesn't give us the modified album
# Uncomment the following once frontend issue #937 is resolved
# TODO: Uncomment the following once frontend issue #937 is resolved
# return album
def view(self, album, **kwds):

View file

@ -67,7 +67,9 @@ class ApiPhoto:
return photo
def upload(self, photo_file, **kwds):
raise NotImplementedError("Use upload_encoded instead.")
result = self._client.post("/photo/upload.json", files={'photo': photo_file},
**kwds)["result"]
return Photo(self._client, result)
def upload_encoded(self, photo_file, **kwds):
""" Base64-encodes and uploads the specified file """
@ -81,8 +83,8 @@ class ApiPhoto:
def next_previous(self, photo, **kwds):
"""
Returns a dict containing the next and previous photo objects,
given a photo in the middle.
Returns a dict containing the next and previous photo lists
(there may be more than one next/previous photo returned).
"""
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})

View file

@ -47,7 +47,8 @@ def main(args=sys.argv[1:]):
if options.method == "GET":
result = client.get(options.endpoint, process_response=False, **params)
else:
result = client.post(options.endpoint, process_response=False, **params)
params, files = extract_files(params)
result = client.post(options.endpoint, process_response=False, files=files, **params)
if options.verbose:
print "==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, options.host, options.endpoint)
@ -62,5 +63,24 @@ def main(args=sys.argv[1:]):
else:
print result
def extract_files(params):
"""
Extract filenames from the "photo" parameter, so they can be uploaded, returning (updated_params, files).
Uses the same technique as openphoto-php:
* Filename can only be in the "photo" parameter
* Filename must be prefixed with "@"
* Filename must exist
...otherwise the parameter is not extracted
"""
files = {}
updated_params = {}
for name in params:
if name == "photo" and params[name].startswith("@") and os.path.isfile(os.path.expanduser(params[name][1:])):
files[name] = params[name][1:]
else:
updated_params[name] = params[name]
return updated_params, files
if __name__ == "__main__":
main()

View file

@ -0,0 +1,31 @@
import os
import mimetypes
import mimetools
def encode_multipart_formdata(params, files):
boundary = mimetools.choose_boundary()
lines = []
for name in params:
lines.append("--" + boundary)
lines.append("Content-Disposition: form-data; name=\"%s\"" % name)
lines.append("")
lines.append(str(params[name]))
for name in files:
filename = files[name]
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
lines.append("--" + boundary)
lines.append("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"" % (name, filename))
lines.append("Content-Type: %s" % content_type)
lines.append("")
lines.append(open(os.path.expanduser(filename), "rb").read())
lines.append("--" + boundary + "--")
lines.append("")
body = "\r\n".join(lines)
headers = {'Content-Type': "multipart/form-data; boundary=%s" % boundary,
'Content-Length': str(len(body))}
return headers, body

View file

@ -74,14 +74,21 @@ class Photo(OpenPhotoObject):
raise NotImplementedError()
def next_previous(self, **kwds):
""" Returns a dict containing the next and previous photo objects """
"""
Returns a dict containing the next and previous photo lists
(there may be more than one next/previous photo returned).
"""
result = self._openphoto.get("/photo/%s/nextprevious.json" % self.id,
**kwds)["result"]
value = {}
if "next" in result:
value["next"] = Photo(self._openphoto, result["next"])
value["next"] = []
for photo in result["next"]:
value["next"].append(Photo(self._openphoto, photo))
if "previous" in result:
value["previous"] = Photo(self._openphoto, result["previous"])
value["previous"] = []
for photo in result["previous"]:
value["previous"].append(Photo(self._openphoto, photo))
return value
def transform(self, **kwds):

View file

@ -1,6 +1,7 @@
import oauth2 as oauth
import urlparse
import urllib
import urllib2
import httplib2
import logging
try:
@ -10,6 +11,7 @@ except ImportError:
from objects import OpenPhotoObject
from errors import *
from multipart_post import encode_multipart_formdata
DUPLICATE_RESPONSE = {"code": 409,
"message": "This photo already exists"}
@ -66,7 +68,7 @@ class OpenPhotoHttp:
else:
return content
def post(self, endpoint, process_response=True, **params):
def post(self, endpoint, process_response=True, files = {}, **params):
"""
Performs an HTTP POST to the specified endpoint (API path),
passing parameters if given.
@ -82,10 +84,16 @@ class OpenPhotoHttp:
consumer = oauth.Consumer(self._consumer_key, self._consumer_secret)
token = oauth.Token(self._token, self._token_secret)
client = oauth.Client(consumer, token)
body = urllib.urlencode(params)
if files:
# Parameters must be signed and encoded into the multipart body
params = self._sign_params(client, url, params)
headers, body = encode_multipart_formdata(params, files)
request = urllib2.Request(url, body, headers)
content = urllib2.urlopen(request).read()
else:
body = urllib.urlencode(params)
_, content = client.request(url, "POST", body)
# TODO: Don't log file data in multipart forms
@ -105,6 +113,17 @@ class OpenPhotoHttp:
else:
return content
@staticmethod
def _sign_params(client, url, params):
"""Use OAuth to sign a dictionary of params"""
request = oauth.Request.from_consumer_and_token(consumer=client.consumer,
token=client.token,
http_method="POST",
http_url=url,
parameters=params)
request.sign_request(client.method, client.consumer, client.token)
return dict(urlparse.parse_qsl(request.to_postdata()))
@staticmethod
def _process_params(params):
""" Converts Unicode/lists/booleans inside HTTP parameters """
@ -161,6 +180,8 @@ class OpenPhotoHttp:
@staticmethod
def _result_to_list(result):
""" Handle the case where the result contains no items """
if not result:
return []
if result[0]["totalRows"] == 0:
return []
else:

View file

@ -13,7 +13,7 @@ A computer, Python 2.7 and an empty OpenPhoto instance.
Create a tests/tokens.py file containing the following:
# tests/token.py
# tests/tokens.py
consumer_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxx"
token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View file

@ -7,7 +7,7 @@ class TestAlbums(test_base.TestBase):
def test_create_delete(self):
""" Create an album then delete it """
album_name = "create_delete_album"
album = self.client.album.create(album_name, visible=True)
album = self.client.album.create(album_name)
# Check the return value
self.assertEqual(album.name, album_name)
@ -20,7 +20,7 @@ class TestAlbums(test_base.TestBase):
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
# Create it again, and delete it using the Album object
album = self.client.album.create(album_name, visible=True)
album = self.client.album.create(album_name)
album.delete()
# Check that the album is now gone
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
@ -56,23 +56,11 @@ class TestAlbums(test_base.TestBase):
self.assertFalse(hasattr(album, "photos"))
# Get the photos in the album using the Album object directly
album.view()
album.view(includeElements=True)
# Make sure all photos are in the album
for photo in self.photos:
self.assertIn(photo.id, [p.id for p in album.photos])
@unittest.expectedFailure # Private albums are not visible - issue #929
def test_private(self):
""" Test that private albums can be created, and are visible """
# Create and check that the album now exists
album_name = "private_album"
album = self.client.album.create(album_name, visible=False)
self.assertIn(album_name, [a.name for a in self.client.albums.list()])
# Delete and check that the album is now gone
album.delete()
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
def test_form(self):
""" If album.form gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):

View file

@ -68,6 +68,7 @@ class TestBase(unittest.TestCase):
"""
self.photos = self.client.photos.list()
if len(self.photos) != 3:
# print self.photos
print "[Regenerating Photos]"
if len(self.photos) > 0:
self._delete_all()
@ -77,7 +78,7 @@ class TestBase(unittest.TestCase):
self.tags = self.client.tags.list()
if (len(self.tags) != 1 or
self.tags[0].id != self.TEST_TAG or
self.tags[0].count != "3"):
self.tags[0].count != 3):
print "[Regenerating Tags]"
self._delete_all()
self._create_test_photos()
@ -109,22 +110,21 @@ class TestBase(unittest.TestCase):
@classmethod
def _create_test_photos(cls):
""" Upload three test photos """
album = cls.client.album.create(cls.TEST_ALBUM, visible=True)
album = cls.client.album.create(cls.TEST_ALBUM)
photos = [
cls.client.photo.upload_encoded("tests/test_photo1.jpg",
cls.client.photo.upload("tests/test_photo1.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
cls.client.photo.upload_encoded("tests/test_photo2.jpg",
albums=album.id),
cls.client.photo.upload("tests/test_photo2.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
cls.client.photo.upload_encoded("tests/test_photo3.jpg",
albums=album.id),
cls.client.photo.upload("tests/test_photo3.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
albums=album.id),
]
# Remove the auto-generated month/year tags
tags_to_remove = [p for p in photos[0].tags if p != cls.TEST_TAG]
# Add the test tag, removing any autogenerated tags
for photo in photos:
photo.update(tagsRemove=tags_to_remove, albums=album.id)
photo.update(tags=cls.TEST_TAG)
@classmethod
def _delete_all(cls):

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.4 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 854 B

After

Width:  |  Height:  |  Size: 910 B

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 579 B

After

Width:  |  Height:  |  Size: 635 B

Before After
Before After

View file

@ -15,10 +15,10 @@ class TestPhotos(test_base.TestBase):
# Check that they're gone
self.assertEqual(self.client.photos.list(), [])
# Re-upload the photos
ret_val = self.client.photo.upload_encoded("tests/test_photo1.jpg",
# Re-upload the photos, one of them using Bas64 encoding
ret_val = self.client.photo.upload("tests/test_photo1.jpg",
title=self.TEST_TITLE)
self.client.photo.upload_encoded("tests/test_photo2.jpg",
self.client.photo.upload("tests/test_photo2.jpg",
title=self.TEST_TITLE)
self.client.photo.upload_encoded("tests/test_photo3.jpg",
title=self.TEST_TITLE)
@ -56,7 +56,7 @@ class TestPhotos(test_base.TestBase):
""" Ensure that duplicate photos are rejected """
# Attempt to upload a duplicate
with self.assertRaises(openphoto.OpenPhotoDuplicateError):
self.client.photo.upload_encoded("tests/test_photo1.jpg",
self.client.photo.upload("tests/test_photo1.jpg",
title=self.TEST_TITLE)
# Check there are still three photos
@ -123,13 +123,13 @@ class TestPhotos(test_base.TestBase):
def test_next_previous(self):
""" Test the next/previous links of the middle photo """
next_prev = self.client.photo.next_previous(self.photos[1])
self.assertEqual(next_prev["previous"].id, self.photos[0].id)
self.assertEqual(next_prev["next"].id, self.photos[2].id)
self.assertEqual(next_prev["previous"][0].id, self.photos[0].id)
self.assertEqual(next_prev["next"][0].id, self.photos[2].id)
# Do the same using the Photo object directly
next_prev = self.photos[1].next_previous()
self.assertEqual(next_prev["previous"].id, self.photos[0].id)
self.assertEqual(next_prev["next"].id, self.photos[2].id)
self.assertEqual(next_prev["previous"][0].id, self.photos[0].id)
self.assertEqual(next_prev["next"][0].id, self.photos[2].id)
def test_replace(self):
""" If photo.replace gets implemented, write a test! """
@ -141,11 +141,6 @@ class TestPhotos(test_base.TestBase):
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.replace_encoded(None, None)
def test_upload(self):
""" If photo.upload gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.upload(None)
def test_dynamic_url(self):
""" If photo.dynamic_url gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):