Merge remote-tracking branch 'upstream/master' into apiv2_frontend_bugfixes
ecially if it merges an updated upstream into a topic branch.
This commit is contained in:
commit
fcf43377dd
11 changed files with 125 additions and 28 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -4,3 +4,4 @@ build
|
||||||
dist
|
dist
|
||||||
*.egg-info
|
*.egg-info
|
||||||
tests/tokens.py
|
tests/tokens.py
|
||||||
|
tests.log
|
||||||
|
|
|
@ -67,7 +67,9 @@ class ApiPhoto:
|
||||||
return photo
|
return photo
|
||||||
|
|
||||||
def upload(self, photo_file, **kwds):
|
def upload(self, photo_file, **kwds):
|
||||||
raise NotImplementedError("Use upload_encoded instead.")
|
result = self._client.post("/photo/upload.json", files={'photo': photo_file},
|
||||||
|
**kwds)["result"]
|
||||||
|
return Photo(self._client, result)
|
||||||
|
|
||||||
def upload_encoded(self, photo_file, **kwds):
|
def upload_encoded(self, photo_file, **kwds):
|
||||||
""" Base64-encodes and uploads the specified file """
|
""" Base64-encodes and uploads the specified file """
|
||||||
|
|
|
@ -47,7 +47,8 @@ def main(args=sys.argv[1:]):
|
||||||
if options.method == "GET":
|
if options.method == "GET":
|
||||||
result = client.get(options.endpoint, process_response=False, **params)
|
result = client.get(options.endpoint, process_response=False, **params)
|
||||||
else:
|
else:
|
||||||
result = client.post(options.endpoint, process_response=False, **params)
|
params, files = extract_files(params)
|
||||||
|
result = client.post(options.endpoint, process_response=False, files=files, **params)
|
||||||
|
|
||||||
if options.verbose:
|
if options.verbose:
|
||||||
print "==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, options.host, options.endpoint)
|
print "==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, options.host, options.endpoint)
|
||||||
|
@ -62,5 +63,24 @@ def main(args=sys.argv[1:]):
|
||||||
else:
|
else:
|
||||||
print result
|
print result
|
||||||
|
|
||||||
|
def extract_files(params):
|
||||||
|
"""
|
||||||
|
Extract filenames from the "photo" parameter, so they can be uploaded, returning (updated_params, files).
|
||||||
|
Uses the same technique as openphoto-php:
|
||||||
|
* Filename can only be in the "photo" parameter
|
||||||
|
* Filename must be prefixed with "@"
|
||||||
|
* Filename must exist
|
||||||
|
...otherwise the parameter is not extracted
|
||||||
|
"""
|
||||||
|
files = {}
|
||||||
|
updated_params = {}
|
||||||
|
for name in params:
|
||||||
|
if name == "photo" and params[name].startswith("@") and os.path.isfile(os.path.expanduser(params[name][1:])):
|
||||||
|
files[name] = params[name][1:]
|
||||||
|
else:
|
||||||
|
updated_params[name] = params[name]
|
||||||
|
|
||||||
|
return updated_params, files
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
31
openphoto/multipart_post.py
Normal file
31
openphoto/multipart_post.py
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
import os
|
||||||
|
import mimetypes
|
||||||
|
import mimetools
|
||||||
|
|
||||||
|
def encode_multipart_formdata(params, files):
|
||||||
|
boundary = mimetools.choose_boundary()
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
for name in params:
|
||||||
|
lines.append("--" + boundary)
|
||||||
|
lines.append("Content-Disposition: form-data; name=\"%s\"" % name)
|
||||||
|
lines.append("")
|
||||||
|
lines.append(str(params[name]))
|
||||||
|
for name in files:
|
||||||
|
filename = files[name]
|
||||||
|
content_type, _ = mimetypes.guess_type(filename)
|
||||||
|
if content_type is None:
|
||||||
|
content_type = "application/octet-stream"
|
||||||
|
|
||||||
|
lines.append("--" + boundary)
|
||||||
|
lines.append("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"" % (name, filename))
|
||||||
|
lines.append("Content-Type: %s" % content_type)
|
||||||
|
lines.append("")
|
||||||
|
lines.append(open(os.path.expanduser(filename), "rb").read())
|
||||||
|
lines.append("--" + boundary + "--")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
body = "\r\n".join(lines)
|
||||||
|
headers = {'Content-Type': "multipart/form-data; boundary=%s" % boundary,
|
||||||
|
'Content-Length': str(len(body))}
|
||||||
|
return headers, body
|
|
@ -1,7 +1,9 @@
|
||||||
import oauth2 as oauth
|
import oauth2 as oauth
|
||||||
import urlparse
|
import urlparse
|
||||||
import urllib
|
import urllib
|
||||||
|
import urllib2
|
||||||
import httplib2
|
import httplib2
|
||||||
|
import logging
|
||||||
try:
|
try:
|
||||||
import json
|
import json
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -9,6 +11,7 @@ except ImportError:
|
||||||
|
|
||||||
from objects import OpenPhotoObject
|
from objects import OpenPhotoObject
|
||||||
from errors import *
|
from errors import *
|
||||||
|
from multipart_post import encode_multipart_formdata
|
||||||
|
|
||||||
DUPLICATE_RESPONSE = {"code": 409,
|
DUPLICATE_RESPONSE = {"code": 409,
|
||||||
"message": "This photo already exists"}
|
"message": "This photo already exists"}
|
||||||
|
@ -23,6 +26,8 @@ class OpenPhotoHttp:
|
||||||
self._token = token
|
self._token = token
|
||||||
self._token_secret = token_secret
|
self._token_secret = token_secret
|
||||||
|
|
||||||
|
self._logger = logging.getLogger("openphoto")
|
||||||
|
|
||||||
# Remember the most recent HTTP request and response
|
# Remember the most recent HTTP request and response
|
||||||
self.last_url = None
|
self.last_url = None
|
||||||
self.last_params = None
|
self.last_params = None
|
||||||
|
@ -48,6 +53,11 @@ class OpenPhotoHttp:
|
||||||
|
|
||||||
_, content = client.request(url, "GET")
|
_, content = client.request(url, "GET")
|
||||||
|
|
||||||
|
self._logger.info("============================")
|
||||||
|
self._logger.info("GET %s" % url)
|
||||||
|
self._logger.info("---")
|
||||||
|
self._logger.info(content)
|
||||||
|
|
||||||
self.last_url = url
|
self.last_url = url
|
||||||
self.last_params = params
|
self.last_params = params
|
||||||
self.last_response = content
|
self.last_response = content
|
||||||
|
@ -58,7 +68,7 @@ class OpenPhotoHttp:
|
||||||
else:
|
else:
|
||||||
return content
|
return content
|
||||||
|
|
||||||
def post(self, endpoint, process_response=True, **params):
|
def post(self, endpoint, process_response=True, files = {}, **params):
|
||||||
"""
|
"""
|
||||||
Performs an HTTP POST to the specified endpoint (API path),
|
Performs an HTTP POST to the specified endpoint (API path),
|
||||||
passing parameters if given.
|
passing parameters if given.
|
||||||
|
@ -74,10 +84,25 @@ class OpenPhotoHttp:
|
||||||
|
|
||||||
consumer = oauth.Consumer(self._consumer_key, self._consumer_secret)
|
consumer = oauth.Consumer(self._consumer_key, self._consumer_secret)
|
||||||
token = oauth.Token(self._token, self._token_secret)
|
token = oauth.Token(self._token, self._token_secret)
|
||||||
|
|
||||||
client = oauth.Client(consumer, token)
|
client = oauth.Client(consumer, token)
|
||||||
body = urllib.urlencode(params)
|
|
||||||
_, content = client.request(url, "POST", body)
|
if files:
|
||||||
|
# Parameters must be signed and encoded into the multipart body
|
||||||
|
params = self._sign_params(client, url, params)
|
||||||
|
headers, body = encode_multipart_formdata(params, files)
|
||||||
|
request = urllib2.Request(url, body, headers)
|
||||||
|
content = urllib2.urlopen(request).read()
|
||||||
|
else:
|
||||||
|
body = urllib.urlencode(params)
|
||||||
|
_, content = client.request(url, "POST", body)
|
||||||
|
|
||||||
|
# TODO: Don't log file data in multipart forms
|
||||||
|
self._logger.info("============================")
|
||||||
|
self._logger.info("POST %s" % url)
|
||||||
|
if body:
|
||||||
|
self._logger.info(body)
|
||||||
|
self._logger.info("---")
|
||||||
|
self._logger.info(content)
|
||||||
|
|
||||||
self.last_url = url
|
self.last_url = url
|
||||||
self.last_params = params
|
self.last_params = params
|
||||||
|
@ -88,6 +113,17 @@ class OpenPhotoHttp:
|
||||||
else:
|
else:
|
||||||
return content
|
return content
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _sign_params(client, url, params):
|
||||||
|
"""Use OAuth to sign a dictionary of params"""
|
||||||
|
request = oauth.Request.from_consumer_and_token(consumer=client.consumer,
|
||||||
|
token=client.token,
|
||||||
|
http_method="POST",
|
||||||
|
http_url=url,
|
||||||
|
parameters=params)
|
||||||
|
request.sign_request(client.method, client.consumer, client.token)
|
||||||
|
return dict(urlparse.parse_qsl(request.to_postdata()))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _process_params(params):
|
def _process_params(params):
|
||||||
""" Converts Unicode/lists/booleans inside HTTP parameters """
|
""" Converts Unicode/lists/booleans inside HTTP parameters """
|
||||||
|
|
|
@ -13,7 +13,7 @@ A computer, Python 2.7 and an empty OpenPhoto instance.
|
||||||
|
|
||||||
Create a tests/tokens.py file containing the following:
|
Create a tests/tokens.py file containing the following:
|
||||||
|
|
||||||
# tests/token.py
|
# tests/tokens.py
|
||||||
consumer_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
consumer_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
consumer_secret = "xxxxxxxxxx"
|
consumer_secret = "xxxxxxxxxx"
|
||||||
token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import unittest
|
import unittest
|
||||||
|
import logging
|
||||||
import openphoto
|
import openphoto
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -26,6 +27,12 @@ class TestBase(unittest.TestCase):
|
||||||
unittest.TestCase.__init__(self, *args, **kwds)
|
unittest.TestCase.__init__(self, *args, **kwds)
|
||||||
self.photos = []
|
self.photos = []
|
||||||
|
|
||||||
|
LOG_FILENAME = "tests.log"
|
||||||
|
logging.basicConfig(filename="tests.log",
|
||||||
|
filemode="w",
|
||||||
|
format="%(message)s",
|
||||||
|
level=logging.INFO)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
""" Ensure there is nothing on the server before running any tests """
|
""" Ensure there is nothing on the server before running any tests """
|
||||||
|
@ -95,20 +102,25 @@ class TestBase(unittest.TestCase):
|
||||||
print "Albums: %s" % self.albums
|
print "Albums: %s" % self.albums
|
||||||
raise Exception("Album creation failed")
|
raise Exception("Album creation failed")
|
||||||
|
|
||||||
|
logging.info("\nRunning %s..." % self.id())
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
logging.info("Finished %s\n" % self.id())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _create_test_photos(cls):
|
def _create_test_photos(cls):
|
||||||
""" Upload three test photos """
|
""" Upload three test photos """
|
||||||
album = cls.client.album.create(cls.TEST_ALBUM)
|
album = cls.client.album.create(cls.TEST_ALBUM)
|
||||||
photos = [
|
photos = [
|
||||||
cls.client.photo.upload_encoded("tests/test_photo1.jpg",
|
cls.client.photo.upload("tests/test_photo1.jpg",
|
||||||
title=cls.TEST_TITLE,
|
title=cls.TEST_TITLE,
|
||||||
albums=album.id),
|
albums=album.id),
|
||||||
cls.client.photo.upload_encoded("tests/test_photo2.jpg",
|
cls.client.photo.upload("tests/test_photo2.jpg",
|
||||||
title=cls.TEST_TITLE,
|
title=cls.TEST_TITLE,
|
||||||
albums=album.id),
|
albums=album.id),
|
||||||
cls.client.photo.upload_encoded("tests/test_photo3.jpg",
|
cls.client.photo.upload("tests/test_photo3.jpg",
|
||||||
title=cls.TEST_TITLE,
|
title=cls.TEST_TITLE,
|
||||||
albums=album.id),
|
albums=album.id),
|
||||||
]
|
]
|
||||||
# Add the test tag, removing any autogenerated tags
|
# Add the test tag, removing any autogenerated tags
|
||||||
for photo in photos:
|
for photo in photos:
|
||||||
|
|
Binary file not shown.
Before Width: | Height: | Size: 1.4 KiB After Width: | Height: | Size: 1.5 KiB |
Binary file not shown.
Before Width: | Height: | Size: 854 B After Width: | Height: | Size: 910 B |
Binary file not shown.
Before Width: | Height: | Size: 579 B After Width: | Height: | Size: 635 B |
|
@ -15,11 +15,11 @@ class TestPhotos(test_base.TestBase):
|
||||||
# Check that they're gone
|
# Check that they're gone
|
||||||
self.assertEqual(self.client.photos.list(), [])
|
self.assertEqual(self.client.photos.list(), [])
|
||||||
|
|
||||||
# Re-upload the photos
|
# Re-upload the photos, one of them using Bas64 encoding
|
||||||
ret_val = self.client.photo.upload_encoded("tests/test_photo1.jpg",
|
ret_val = self.client.photo.upload("tests/test_photo1.jpg",
|
||||||
title=self.TEST_TITLE)
|
title=self.TEST_TITLE)
|
||||||
self.client.photo.upload_encoded("tests/test_photo2.jpg",
|
self.client.photo.upload("tests/test_photo2.jpg",
|
||||||
title=self.TEST_TITLE)
|
title=self.TEST_TITLE)
|
||||||
self.client.photo.upload_encoded("tests/test_photo3.jpg",
|
self.client.photo.upload_encoded("tests/test_photo3.jpg",
|
||||||
title=self.TEST_TITLE)
|
title=self.TEST_TITLE)
|
||||||
|
|
||||||
|
@ -56,8 +56,8 @@ class TestPhotos(test_base.TestBase):
|
||||||
""" Ensure that duplicate photos are rejected """
|
""" Ensure that duplicate photos are rejected """
|
||||||
# Attempt to upload a duplicate
|
# Attempt to upload a duplicate
|
||||||
with self.assertRaises(openphoto.OpenPhotoDuplicateError):
|
with self.assertRaises(openphoto.OpenPhotoDuplicateError):
|
||||||
self.client.photo.upload_encoded("tests/test_photo1.jpg",
|
self.client.photo.upload("tests/test_photo1.jpg",
|
||||||
title=self.TEST_TITLE)
|
title=self.TEST_TITLE)
|
||||||
|
|
||||||
# Check there are still three photos
|
# Check there are still three photos
|
||||||
self.photos = self.client.photos.list()
|
self.photos = self.client.photos.list()
|
||||||
|
@ -141,11 +141,6 @@ class TestPhotos(test_base.TestBase):
|
||||||
with self.assertRaises(openphoto.NotImplementedError):
|
with self.assertRaises(openphoto.NotImplementedError):
|
||||||
self.client.photo.replace_encoded(None, None)
|
self.client.photo.replace_encoded(None, None)
|
||||||
|
|
||||||
def test_upload(self):
|
|
||||||
""" If photo.upload gets implemented, write a test! """
|
|
||||||
with self.assertRaises(openphoto.NotImplementedError):
|
|
||||||
self.client.photo.upload(None)
|
|
||||||
|
|
||||||
def test_dynamic_url(self):
|
def test_dynamic_url(self):
|
||||||
""" If photo.dynamic_url gets implemented, write a test! """
|
""" If photo.dynamic_url gets implemented, write a test! """
|
||||||
with self.assertRaises(openphoto.NotImplementedError):
|
with self.assertRaises(openphoto.NotImplementedError):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue