Merge pull request #13 from sneakypete81/master

Make the library more Pythonic
This commit is contained in:
James Walker 2013-03-12 14:01:28 -07:00
commit 925342072d
20 changed files with 1061 additions and 61 deletions

3
.gitignore vendored
View file

@ -1,5 +1,6 @@
*~
*.pyc *.pyc
build build
dist dist
*.egg-info *.egg-info
tests/tokens.py

View file

@ -14,17 +14,39 @@ python setup.py install
To use the library you need to first ``import openphoto``, then instantiate an instance of the class and start making calls. To use the library you need to first ``import openphoto``, then instantiate an instance of the class and start making calls.
You can use the library in one of two ways:
* Direct GET/POST calls to the server
* Access via Python classes/methods
<a name="get_post"></a>
### Direct GET/POST:
from openphoto import OpenPhoto from openphoto import OpenPhoto
client = OpenPhoto(host, consumerKey, consumerSecret, token, tokenSecret) client = OpenPhoto(host, consumerKey, consumerSecret, token, tokenSecret)
resp = client.get('/photos/list.json') resp = client.get("/photos/list.json")
resp = client.post('/photo/62/update.json', {'tags': 'tag1,tag2'}) resp = client.post("/photo/62/update.json", tags=["tag1", "tag2"])
<a name="python_classes"></a>
### Python classes/methods
from openphoto import OpenPhoto
client = OpenPhoto(host, consumerKey, consumerSecret, token, tokenSecret)
photos = client.photos.list()
photos[0].update(tags=["tag1", "tag2"])
print photos[0].tags
The OpenPhoto Python class hierarchy mirrors the [OpenPhoto API](http://theopenphotoproject.org/documentation) endpoint layout. For example, the calls in the example above use the following API endpoints:
* client.photos.list() -> /photos/list.json
* photos[0].update() -> /photo/&lt;id&gt;/update.json
---------------------------------------- ----------------------------------------
<a name="cli"></a> <a name="cli"></a>
### Using from the command line ### Using from the command line
You'll then want to export your secrets to the environment. When using the command line tools, you'll want to export your secrets to the environment.
We suggest putting them in a file and sourcing it prior to running `openphoto` commands. We suggest putting them in a file and sourcing it prior to running `openphoto` commands.
<a href="#credentials">Click here for instructions on getting credentials</a>. <a href="#credentials">Click here for instructions on getting credentials</a>.
@ -97,3 +119,4 @@ Now you can run commands to the OpenPhoto API from your shell!
You can get your credentals by clicking on the arrow next to your email address once you're logged into your site and then clicking on settings. You can get your credentals by clicking on the arrow next to your email address once you're logged into your site and then clicking on settings.
If you don't have any credentials then you can create one for yourself by going to `/v1/oauth/flow`. If you don't have any credentials then you can create one for yourself by going to `/v1/oauth/flow`.
Once completed go back to the settings page and you should see the credential you just created Once completed go back to the settings page and you should see the credential you just created

View file

@ -1,50 +1,21 @@
import oauth2 as oauth from openphoto_http import OpenPhotoHttp
import urlparse from errors import *
import urllib import api_photo
import httplib2 import api_tag
import types import api_album
class OpenPhoto(OpenPhotoHttp):
class OpenPhoto(object):
""" Client library for OpenPhoto """ """ Client library for OpenPhoto """
def __init__(self, host,
def __init__(self, host, consumer_key='', consumer_secret='', consumer_key='', consumer_secret='',
token='', token_secret=''): token='', token_secret=''):
self.host = host OpenPhotoHttp.__init__(self, host,
self.consumer_key = consumer_key consumer_key, consumer_secret,
self.consumer_secret = consumer_secret token, token_secret)
self.token = token
self.token_secret = token_secret
def get(self, endpoint, params={}): self.photos = api_photo.ApiPhotos(self)
url = urlparse.urlunparse(('http', self.host, endpoint, '', self.photo = api_photo.ApiPhoto(self)
urllib.urlencode(params), '')) self.tags = api_tag.ApiTags(self)
if self.consumer_key: self.tag = api_tag.ApiTag(self)
consumer = oauth.Consumer(self.consumer_key, self.consumer_secret) self.albums = api_album.ApiAlbums(self)
token = oauth.Token(self.token, self.token_secret) self.album = api_album.ApiAlbum(self)
client = oauth.Client(consumer, token)
else:
client = httplib2.Http()
headers, content = client.request(url, "GET")
return content
def post(self, endpoint, params={}):
url = urlparse.urlunparse(('http', self.host, endpoint, '', '', ''))
if self.consumer_key:
consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
token = oauth.Token(self.token, self.token_secret)
# ensure utf-8 encoding for all values.
params = dict([(k, v.encode('utf-8')
if type(v) is types.UnicodeType else v)
for (k, v) in params.items()])
client = oauth.Client(consumer, token)
body = urllib.urlencode(params)
headers, content = client.request(url, "POST", body)
return content

55
openphoto/api_album.py Normal file
View file

@ -0,0 +1,55 @@
from errors import *
from objects import Album
class ApiAlbums:
def __init__(self, client):
self._client = client
def list(self, **kwds):
""" Return a list of Album objects """
results = self._client.get("/albums/list.json", **kwds)["result"]
return [Album(self._client, album) for album in results]
class ApiAlbum:
def __init__(self, client):
self._client = client
def create(self, name, **kwds):
""" Create a new album and return it"""
result = self._client.post("/album/create.json", name=name, **kwds)["result"]
return Album(self._client, result)
def delete(self, album, **kwds):
""" Delete an album """
if not isinstance(album, Album):
album = Album(self._client, {"id": album})
album.delete(**kwds)
def form(self, album, **kwds):
raise NotImplementedError()
def add_photos(self, album, photos, **kwds):
raise NotImplementedError()
def remove_photos(self, album, photos, **kwds):
raise NotImplementedError()
def update(self, album, **kwds):
""" Update an album """
if not isinstance(album, Album):
album = Album(self._client, {"id": album})
album.update(**kwds)
# Don't return the album, since the API currently doesn't give us the modified album
# Uncomment the following once frontend issue #937 is resolved
# return album
def view(self, album, **kwds):
"""
View an album's contents.
Returns the requested album object.
"""
if not isinstance(album, Album):
album = Album(self._client, {"id": album})
album.view(**kwds)
return album

92
openphoto/api_photo.py Normal file
View file

@ -0,0 +1,92 @@
import base64
from errors import *
from objects import Photo
class ApiPhotos:
def __init__(self, client):
self._client = client
def list(self, **kwds):
""" Returns a list of Photo objects """
photos = self._client.get("/photos/list.json", **kwds)["result"]
photos = self._client._result_to_list(photos)
return [Photo(self._client, photo) for photo in photos]
def update(self, photos, **kwds):
""" Updates a list of photos """
if not self._client.post("/photos/update.json", ids=photos, **kwds)["result"]:
raise OpenPhotoError("Update response returned False")
def delete(self, photos, **kwds):
""" Deletes a list of photos """
if not self._client.post("/photos/delete.json", ids=photos, **kwds)["result"]:
raise OpenPhotoError("Delete response returned False")
class ApiPhoto:
def __init__(self, client):
self._client = client
def delete(self, photo, **kwds):
""" Delete a photo """
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})
photo.delete(**kwds)
def edit(self, photo, **kwds):
""" Returns an HTML form to edit a photo """
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})
return photo.edit(**kwds)
def replace(self, photo, photo_file, **kwds):
raise NotImplementedError()
def replace_encoded(self, photo, photo_file, **kwds):
raise NotImplementedError()
def update(self, photo, **kwds):
"""
Update a photo with the specified parameters.
Returns the updated photo object
"""
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})
photo.update(**kwds)
return photo
def view(self, photo, **kwds):
"""
Used to view the photo at a particular size.
Returns the requested photo object
"""
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})
photo.view(**kwds)
return photo
def upload(self, photo_file, **kwds):
raise NotImplementedError("Use upload_encoded instead.")
def upload_encoded(self, photo_file, **kwds):
""" Base64-encodes and uploads the specified file """
encoded_photo = base64.b64encode(open(photo_file, "rb").read())
result = self._client.post("/photo/upload.json", photo=encoded_photo,
**kwds)["result"]
return Photo(self._client, result)
def dynamic_url(self, photo, **kwds):
raise NotImplementedError()
def next_previous(self, photo, **kwds):
"""
Returns a dict containing the next and previous photo objects,
given a photo in the middle.
"""
if not isinstance(photo, Photo):
photo = Photo(self._client, {"id": photo})
return photo.next_previous(**kwds)
def transform(self, photo, **kwds):
raise NotImplementedError()

33
openphoto/api_tag.py Normal file
View file

@ -0,0 +1,33 @@
from errors import *
from objects import Tag
class ApiTags:
def __init__(self, client):
self._client = client
def list(self, **kwds):
""" Returns a list of Tag objects """
results = self._client.get("/tags/list.json", **kwds)["result"]
return [Tag(self._client, tag) for tag in results]
class ApiTag:
def __init__(self, client):
self._client = client
def create(self, tag, **kwds):
""" Create a new tag and return it """
result = self._client.post("/tag/create.json", tag=tag, **kwds)["result"]
return Tag(self._client, result)
def delete(self, tag, **kwds):
""" Delete a tag """
if not isinstance(tag, Tag):
tag = Tag(self._client, {"id": tag})
tag.delete(**kwds)
def update(self, tag, **kwds):
""" Update a tag """
if not isinstance(tag, Tag):
tag = Tag(self._client, {"id": tag})
tag.update(**kwds)
return tag

12
openphoto/errors.py Normal file
View file

@ -0,0 +1,12 @@
class OpenPhotoError(Exception):
""" Indicates that an OpenPhoto operation failed """
pass
class OpenPhotoDuplicateError(OpenPhotoError):
""" Indicates that an upload operation failed due to a duplicate photo """
pass
class NotImplementedError(OpenPhotoError):
""" Indicates that the API function has not yet been coded - please help! """
pass

View file

@ -6,9 +6,9 @@ import urllib
from optparse import OptionParser from optparse import OptionParser
try: try:
import simplejson as json
except:
import json import json
except ImportError:
import simplejson as json
from openphoto import OpenPhoto from openphoto import OpenPhoto
@ -45,9 +45,9 @@ def main(args=sys.argv[1:]):
client = OpenPhoto(options.host, consumer_key, consumer_secret, token, token_secret) client = OpenPhoto(options.host, consumer_key, consumer_secret, token, token_secret)
if options.method == "GET": if options.method == "GET":
result = client.get(options.endpoint, params) result = client.get(options.endpoint, process_response=False, **params)
else: else:
result = client.post(options.endpoint, params) result = client.post(options.endpoint, process_response=False, **params)
if options.verbose: if options.verbose:
print "==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, options.host, options.endpoint) print "==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, options.host, options.endpoint)

155
openphoto/objects.py Normal file
View file

@ -0,0 +1,155 @@
from errors import *
class OpenPhotoObject:
""" Base object supporting the storage of custom fields as attributes """
def __init__(self, openphoto, json_dict):
self._openphoto = openphoto
self._json_dict = json_dict
self._set_fields(json_dict)
def _set_fields(self, json_dict):
""" Set this object's attributes specified in json_dict """
for key, value in json_dict.items():
if key.startswith("_"):
raise ValueError("Illegal attribute: %s" % key)
setattr(self, key, value)
def _replace_fields(self, json_dict):
"""
Delete this object's attributes, and replace with
those in json_dict.
"""
for key in self._json_dict.keys():
delattr(self, key)
self._json_dict = json_dict
self._set_fields(json_dict)
def __repr__(self):
if hasattr(self, "name"):
return "<%s name='%s'>" % (self.__class__, self.name)
elif hasattr(self, "id"):
return "<%s id='%s'>" % (self.__class__, self.id)
else:
return "<%s>" % (self.__class__)
def get_fields(self):
""" Returns this object's attributes """
return self._json_dict
class Photo(OpenPhotoObject):
def delete(self, **kwds):
""" Delete this photo """
self._openphoto.post("/photo/%s/delete.json" % self.id, **kwds)
self._replace_fields({})
def edit(self, **kwds):
""" Returns an HTML form to edit the photo """
result = self._openphoto.get("/photo/%s/edit.json" % self.id,
**kwds)["result"]
return result["markup"]
def replace(self, photo_file, **kwds):
raise NotImplementedError()
def replace_encoded(self, encoded_photo, **kwds):
raise NotImplementedError()
def update(self, **kwds):
""" Update this photo with the specified parameters """
new_dict = self._openphoto.post("/photo/%s/update.json" % self.id,
**kwds)["result"]
self._replace_fields(new_dict)
def view(self, **kwds):
"""
Used to view the photo at a particular size.
Updates the photo's fields with the response.
"""
new_dict = self._openphoto.get("/photo/%s/view.json" % self.id,
**kwds)["result"]
self._replace_fields(new_dict)
def dynamic_url(self, **kwds):
raise NotImplementedError()
def next_previous(self, **kwds):
""" Returns a dict containing the next and previous photo objects """
result = self._openphoto.get("/photo/%s/nextprevious.json" % self.id,
**kwds)["result"]
value = {}
if "next" in result:
value["next"] = Photo(self._openphoto, result["next"])
if "previous" in result:
value["previous"] = Photo(self._openphoto, result["previous"])
return value
def transform(self, **kwds):
raise NotImplementedError()
class Tag(OpenPhotoObject):
def delete(self, **kwds):
""" Delete this tag """
self._openphoto.post("/tag/%s/delete.json" % self.id, **kwds)
self._replace_fields({})
def update(self, **kwds):
""" Update this tag with the specified parameters """
new_dict = self._openphoto.post("/tag/%s/update.json" % self.id,
**kwds)["result"]
self._replace_fields(new_dict)
class Album(OpenPhotoObject):
def __init__(self, openphoto, json_dict):
OpenPhotoObject.__init__(self, openphoto, json_dict)
self._update_fields_with_objects()
def _update_fields_with_objects(self):
""" Convert dict fields into objects, where appropriate """
# Update the cover with a photo object
if hasattr(self, "cover") and isinstance(self.cover, dict):
self.cover = Photo(self._openphoto, self.cover)
# Update the photo list with photo objects
if hasattr(self, "photos") and isinstance(self.photos, list):
for i, photo in enumerate(self.photos):
if isinstance(photo, dict):
self.photos[i] = Photo(self._openphoto, photo)
def delete(self, **kwds):
""" Delete this album """
self._openphoto.post("/album/%s/delete.json" % self.id, **kwds)
self._replace_fields({})
def form(self, **kwds):
raise NotImplementedError()
def add_photos(self, **kwds):
raise NotImplementedError()
def remove_photos(self, **kwds):
raise NotImplementedError()
def update(self, **kwds):
""" Update this album with the specified parameters """
new_dict = self._openphoto.post("/album/%s/update.json" % self.id,
**kwds)["result"]
# Since the API doesn't give us the modified album, we need to
# update our fields based on the kwds that were sent
self._set_fields(kwds)
# Replace the above line with the below once frontend issue #937 is resolved
# self._set_fields(new_dict)
# self._update_fields_with_objects()
def view(self, **kwds):
"""
Requests the full contents of the album.
Updates the album's fields with the response.
"""
result = self._openphoto.get("/album/%s/view.json" % self.id,
**kwds)["result"]
self._replace_fields(result)
self._update_fields_with_objects()

150
openphoto/openphoto_http.py Normal file
View file

@ -0,0 +1,150 @@
import oauth2 as oauth
import urlparse
import urllib
import httplib2
try:
import json
except ImportError:
import simplejson as json
from objects import OpenPhotoObject
from errors import *
DUPLICATE_RESPONSE = {"code": 409,
"message": "This photo already exists"}
class OpenPhotoHttp:
""" Base class to handle HTTP requests to an OpenPhoto server """
def __init__(self, host, consumer_key='', consumer_secret='',
token='', token_secret=''):
self._host = host
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._token = token
self._token_secret = token_secret
# Remember the most recent HTTP request and response
self.last_url = None
self.last_params = None
self.last_response = None
def get(self, endpoint, process_response=True, **params):
"""
Performs an HTTP GET from the specified endpoint (API path),
passing parameters if given.
Returns the decoded JSON dictionary, and raises exceptions if an
error code is received.
Returns the raw response if process_response=False
"""
params = self._process_params(params)
url = urlparse.urlunparse(('http', self._host, endpoint, '',
urllib.urlencode(params), ''))
if self._consumer_key:
consumer = oauth.Consumer(self._consumer_key, self._consumer_secret)
token = oauth.Token(self._token, self._token_secret)
client = oauth.Client(consumer, token)
else:
client = httplib2.Http()
_, content = client.request(url, "GET")
self.last_url = url
self.last_params = params
self.last_response = content
if process_response:
return self._process_response(content)
return response
else:
return content
def post(self, endpoint, process_response=True, **params):
"""
Performs an HTTP POST to the specified endpoint (API path),
passing parameters if given.
Returns the decoded JSON dictionary, and raises exceptions if an
error code is received.
Returns the raw response if process_response=False
"""
params = self._process_params(params)
url = urlparse.urlunparse(('http', self._host, endpoint, '', '', ''))
if not self._consumer_key:
raise OpenPhotoError("Cannot issue POST without OAuth tokens")
consumer = oauth.Consumer(self._consumer_key, self._consumer_secret)
token = oauth.Token(self._token, self._token_secret)
client = oauth.Client(consumer, token)
body = urllib.urlencode(params)
_, content = client.request(url, "POST", body)
self.last_url = url
self.last_params = params
self.last_response = content
if process_response:
return self._process_response(content)
else:
return content
@staticmethod
def _process_params(params):
""" Converts Unicode/lists/booleans inside HTTP parameters """
processed_params = {}
for key, value in params.items():
# Extract IDs from objects
if isinstance(value, OpenPhotoObject):
value = value.id
# Use UTF-8 encoding
if isinstance(value, unicode):
value = value.encode('utf-8')
# Handle lists
if isinstance(value, list):
# Make a copy of the list, to avoid overwriting the original
new_list = list(value)
# Extract IDs from objects in the list
for i, item in enumerate(new_list):
if isinstance(item, OpenPhotoObject):
new_list[i] = item.id
# Convert list to unicode string
value = u','.join([unicode(item) for item in new_list])
# Handle booleans
if isinstance(value, bool):
value = 1 if value else 0
processed_params[key] = value
return processed_params
@staticmethod
def _process_response(content):
"""
Decodes the JSON response, returning a dict.
Raises an exception if an invalid response code is received.
"""
response = json.loads(content)
if response["code"] >= 200 and response["code"] < 300:
# Valid response code
return response
error_message = "Code %d: %s" % (response["code"],
response["message"])
# Special case for a duplicate photo error
if (response["code"] == DUPLICATE_RESPONSE["code"] and
DUPLICATE_RESPONSE["message"] in response["message"]):
raise OpenPhotoDuplicateError(error_message)
raise OpenPhotoError(error_message)
@staticmethod
def _result_to_list(result):
""" Handle the case where the result contains no items """
if result[0]["totalRows"] == 0:
return []
else:
return result

4
setup.py Normal file → Executable file
View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
requires = ['oauth2', 'httplib2'] requires = ['oauth2', 'httplib2']
try: try:
import json import json
@ -17,7 +19,7 @@ except ImportError:
'requires': requires} 'requires': requires}
setup(name='openphoto', setup(name='openphoto',
version='0.1', version='0.2',
description='Client library for the openphoto project', description='Client library for the openphoto project',
author='James Walker', author='James Walker',
author_email='walkah@walkah.net', author_email='walkah@walkah.net',

61
tests/README.markdown Normal file
View file

@ -0,0 +1,61 @@
Tests for the Open Photo API / Python Library
=======================
#### OpenPhoto, a photo service for the masses
----------------------------------------
<a name="requirements"></a>
### Requirements
A computer, Python 2.7 and an empty OpenPhoto instance.
---------------------------------------
<a name="setup"></a>
### Setting up
Create a tests/tokens.py file containing the following:
# tests/token.py
consumer_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
consumer_secret = "xxxxxxxxxx"
token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
token_secret = "xxxxxxxxxx"
host = "your_hostname"
Make sure this is an empty test server, **not a production OpenPhoto server!!!**
---------------------------------------
<a name="running"></a>
### Running the tests
cd /path/to/openphoto-python
python -m unittest discover -c
The "-c" lets you stop the tests gracefully with \[CTRL\]-c.
The easiest way to run a subset of the tests is with nose:
cd /path/to/openphoto-python
nosetests -v -s tests/test_albums.py:TestAlbums.test_view
---------------------------------------
<a name="test_details"></a>
### Test Details
These tests are intended to verify the Python library. They don't provide comprehensive testing of the OpenPhoto API, there are PHP unit tests for that.
Each test class is run as follows:
**SetUpClass:**
Check that the server is empty
**SetUp:**
Ensure there are:
* Three test photos
* A single test tag applied to each
* A single album containing all three photos
**TearDownClass:**
Remove all photos, tags and albums

0
tests/__init__.py Normal file
View file

89
tests/test_albums.py Normal file
View file

@ -0,0 +1,89 @@
import unittest
import openphoto
import test_base
class TestAlbums(test_base.TestBase):
def test_create_delete(self):
""" Create an album then delete it """
album_name = "create_delete_album"
album = self.client.album.create(album_name, visible=True)
# Check the return value
self.assertEqual(album.name, album_name)
# Check that the album now exists
self.assertIn(album_name, [a.name for a in self.client.albums.list()])
# Delete the album
self.client.album.delete(album.id)
# Check that the album is now gone
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
# Create it again, and delete it using the Album object
album = self.client.album.create(album_name, visible=True)
album.delete()
# Check that the album is now gone
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
def test_update(self):
""" Test that an album can be updated """
# Update the album using the OpenPhoto class, passing in the album object
new_name = "New Name"
self.client.album.update(self.albums[0], name=new_name)
# Check that the album is updated
self.albums = self.client.albums.list()
self.assertEqual(self.albums[0].name, new_name)
# Update the album using the OpenPhoto class, passing in the album id
new_name = "Another New Name"
self.client.album.update(self.albums[0].id, name=new_name)
# Check that the album is updated
self.albums = self.client.albums.list()
self.assertEqual(self.albums[0].name, new_name)
# Update the album using the Album object directly
self.albums[0].update(name=self.TEST_ALBUM)
# Check that the album is updated
self.albums = self.client.albums.list()
self.assertEqual(self.albums[0].name, self.TEST_ALBUM)
def test_view(self):
""" Test the album view """
album = self.albums[0]
self.assertFalse(hasattr(album, "photos"))
# Get the photos in the album using the Album object directly
album.view()
# Make sure all photos are in the album
for photo in self.photos:
self.assertIn(photo.id, [p.id for p in album.photos])
@unittest.expectedFailure # Private albums are not visible - issue #929
def test_private(self):
""" Test that private albums can be created, and are visible """
# Create and check that the album now exists
album_name = "private_album"
album = self.client.album.create(album_name, visible=False)
self.assertIn(album_name, [a.name for a in self.client.albums.list()])
# Delete and check that the album is now gone
album.delete()
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
def test_form(self):
""" If album.form gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.album.form(None)
def test_add_photos(self):
""" If album.add_photos gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.album.add_photos(None, None)
def test_remove_photos(self):
""" If album.remove_photos gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.album.remove_photos(None, None)

128
tests/test_base.py Normal file
View file

@ -0,0 +1,128 @@
import unittest
import openphoto
try:
import tokens
except ImportError:
print ("********************************************************************\n"
"You need to create a 'tokens.py' file containing the following:\n\n"
" host = \"<test_url>\"\n"
" consumer_key = \"<test_consumer_key>\"\n"
" consumer_secret = \"<test_consumer_secret>\"\n"
" token = \"<test_token>\"\n"
" token_secret = \"<test_token_secret>\"\n"
" host = \"<hostname>\"\n\n"
"WARNING: Don't use a production OpenPhoto instance for this!\n"
"********************************************************************\n")
raise
class TestBase(unittest.TestCase):
TEST_TITLE = "Test Image - delete me!"
TEST_TAG = "test_tag"
TEST_ALBUM = "test_album"
MAXIMUM_TEST_PHOTOS = 4 # Never have more the 4 photos on the test server
def __init__(self, *args, **kwds):
unittest.TestCase.__init__(self, *args, **kwds)
self.photos = []
@classmethod
def setUpClass(cls):
""" Ensure there is nothing on the server before running any tests """
cls.client = openphoto.OpenPhoto(tokens.host,
tokens.consumer_key, tokens.consumer_secret,
tokens.token, tokens.token_secret)
if cls.client.photos.list() != []:
raise ValueError("The test server (%s) contains photos. "
"Please delete them before running the tests"
% tokens.host)
if cls.client.tags.list() != []:
raise ValueError("The test server (%s) contains tags. "
"Please delete them before running the tests"
% tokens.host)
if cls.client.albums.list() != []:
raise ValueError("The test server (%s) contains albums. "
"Please delete them before running the tests"
% tokens.host)
@classmethod
def tearDownClass(cls):
""" Once all tests have finished, delete all photos, tags and albums"""
cls._delete_all()
def setUp(self):
"""
Ensure the three test photos are present before each test.
Give them each a tag.
Put them into an album.
"""
self.photos = self.client.photos.list()
if len(self.photos) != 3:
print "[Regenerating Photos]"
if len(self.photos) > 0:
self._delete_all()
self._create_test_photos()
self.photos = self.client.photos.list()
self.tags = self.client.tags.list()
if (len(self.tags) != 1 or
self.tags[0].id != self.TEST_TAG or
self.tags[0].count != "3"):
print "[Regenerating Tags]"
self._delete_all()
self._create_test_photos()
self.photos = self.client.photos.list()
self.tags = self.client.tags.list()
if len(self.tags) != 1:
print "Tags: %s" % self.tags
raise Exception("Tag creation failed")
self.albums = self.client.albums.list()
if (len(self.albums) != 1 or
self.albums[0].name != self.TEST_ALBUM or
self.albums[0].count != "3"):
print "[Regenerating Albums]"
self._delete_all()
self._create_test_photos()
self.photos = self.client.photos.list()
self.tags = self.client.tags.list()
self.albums = self.client.albums.list()
if len(self.albums) != 1:
print "Albums: %s" % self.albums
raise Exception("Album creation failed")
@classmethod
def _create_test_photos(cls):
""" Upload three test photos """
album = cls.client.album.create(cls.TEST_ALBUM, visible=True)
photos = [
cls.client.photo.upload_encoded("tests/test_photo1.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
cls.client.photo.upload_encoded("tests/test_photo2.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
cls.client.photo.upload_encoded("tests/test_photo3.jpg",
title=cls.TEST_TITLE,
tags=cls.TEST_TAG),
]
# Remove the auto-generated month/year tags
tags_to_remove = [p for p in photos[0].tags if p != cls.TEST_TAG]
for photo in photos:
photo.update(tagsRemove=tags_to_remove, albums=album.id)
@classmethod
def _delete_all(cls):
photos = cls.client.photos.list()
if len(photos) > cls.MAXIMUM_TEST_PHOTOS:
raise ValueError("There too many photos on the test server - must always be less than %d."
% cls.MAXIMUM_TEST_PHOTOS)
for photo in photos:
photo.delete()
for tag in cls.client.tags.list():
tag.delete()
for album in cls.client.albums.list():
album.delete()

BIN
tests/test_photo1.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

BIN
tests/test_photo2.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 854 B

BIN
tests/test_photo3.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 579 B

157
tests/test_photos.py Normal file
View file

@ -0,0 +1,157 @@
import unittest
import openphoto
import test_base
class TestPhotos(test_base.TestBase):
def test_delete_upload(self):
""" Test photo deletion and upload """
# Delete one photo using the OpenPhoto class, passing in the id
self.client.photo.delete(self.photos[0].id)
# Delete one photo using the OpenPhoto class, passing in the object
self.client.photo.delete(self.photos[1])
# And another using the Photo object directly
self.photos[2].delete()
# Check that they're gone
self.assertEqual(self.client.photos.list(), [])
# Re-upload the photos
ret_val = self.client.photo.upload_encoded("tests/test_photo1.jpg",
title=self.TEST_TITLE)
self.client.photo.upload_encoded("tests/test_photo2.jpg",
title=self.TEST_TITLE)
self.client.photo.upload_encoded("tests/test_photo3.jpg",
title=self.TEST_TITLE)
# Check there are now three photos
self.photos = self.client.photos.list()
self.assertEqual(len(self.photos), 3)
# Check that the upload return value was correct
pathOriginals = [photo.pathOriginal for photo in self.photos]
self.assertIn(ret_val.pathOriginal, pathOriginals)
# Delete all photos in one go
self.client.photos.delete(self.photos)
# Check they're gone
self.photos = self.client.photos.list()
self.assertEqual(len(self.photos), 0)
# Regenerate the original test photos
self._delete_all()
self._create_test_photos()
def test_edit(self):
""" Check that the edit request returns an HTML form """
# Test using the OpenPhoto class
html = self.client.photo.edit(self.photos[0])
self.assertIn("<form", html.lower())
# And the Photo object directly
html = self.photos[0].edit()
self.assertIn("<form", html.lower())
def test_upload_duplicate(self):
""" Ensure that duplicate photos are rejected """
# Attempt to upload a duplicate
with self.assertRaises(openphoto.OpenPhotoDuplicateError):
self.client.photo.upload_encoded("tests/test_photo1.jpg",
title=self.TEST_TITLE)
# Check there are still three photos
self.photos = self.client.photos.list()
self.assertEqual(len(self.photos), 3)
def test_update(self):
""" Update a photo by editing the title """
title = u"\xfcmlaut" # umlauted umlaut
# Get a photo and check that it doesn't have the magic title
photo = self.photos[0]
self.assertNotEqual(photo.title, title)
# Add the title to a photo using the OpenPhoto class
ret_val = self.client.photo.update(photo, title=title)
# Check that it's there
self.photos = self.client.photos.list()
photo = self.photos[0]
self.assertEqual(photo.title, title)
# Check that the return value was correct
self.assertEqual(ret_val.pathOriginal, photo.pathOriginal)
# Revert the title using the Photo object directly
photo.update(title=self.TEST_TITLE)
# Check that it's gone back
self.photos = self.client.photos.list()
self.assertEqual(self.photos[0].title, self.TEST_TITLE)
def test_update_multiple(self):
""" Update multiple photos by adding tags """
tag_id = "update_photo_tag"
# Get a couple of photos
photos = self.photos[:2]
# Add the tag using a list of photo objects
self.client.photos.update(photos, tagsAdd=tag_id)
# Check that it's there
for photo in self.client.photos.list()[:2]:
self.assertIn(tag_id, photo.tags)
# Remove the tags using a list of photo ids
self.client.photos.update([photo.id for photo in photos],
tagsRemove=tag_id)
def test_view(self):
""" Test photo view """
# Check that our magic sizes aren't present
photo = self.photos[0]
self.assertFalse(hasattr(photo, "path9x9"))
self.assertFalse(hasattr(photo, "path19x19"))
# View at a particular size using the OpenPhoto class
photo = self.client.photo.view(photo, returnSizes="9x9")
self.assertTrue(hasattr(photo, "path9x9"))
# View at a particular size using the Photo object directly
photo.view(returnSizes="19x19")
self.assertTrue(hasattr(photo, "path19x19"))
def test_next_previous(self):
""" Test the next/previous links of the middle photo """
next_prev = self.client.photo.next_previous(self.photos[1])
self.assertEqual(next_prev["previous"].id, self.photos[0].id)
self.assertEqual(next_prev["next"].id, self.photos[2].id)
# Do the same using the Photo object directly
next_prev = self.photos[1].next_previous()
self.assertEqual(next_prev["previous"].id, self.photos[0].id)
self.assertEqual(next_prev["next"].id, self.photos[2].id)
def test_replace(self):
""" If photo.replace gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.replace(None, None)
def test_replace_encoded(self):
""" If photo.replace_encoded gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.replace_encoded(None, None)
def test_upload(self):
""" If photo.upload gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.upload(None)
def test_dynamic_url(self):
""" If photo.dynamic_url gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.dynamic_url(None)
def test_transform(self):
""" If photo.transform gets implemented, write a test! """
with self.assertRaises(openphoto.NotImplementedError):
self.client.photo.transform(None)

71
tests/test_tags.py Normal file
View file

@ -0,0 +1,71 @@
import unittest
import openphoto
import test_base
class TestTags(test_base.TestBase):
@unittest.expectedFailure # Tag create fails - Issue #927
# NOTE: the below has not been tested/debugged, since it fails at the first step
def test_create_delete(self, tag_name="create_tag"):
""" Create a tag then delete it """
# Create a tag
tag = self.client.tag.create(tag_name)
# Check the return value
self.assertEqual(tag.id, tag_name)
# Check that the tag now exists
self.assertIn(tag_name, self.client.tags.list())
# Delete the tag
self.client.tag.delete(tag_name)
# Check that the tag is now gone
self.assertNotIn(tag_name, self.client.tags.list())
# Create and delete using the Tag object directly
tag = self.client.tag.create(tag_name)
tag.delete()
# Check that the tag is now gone
self.assertNotIn(tag_name, self.client.tags.list())
@unittest.expectedFailure # Tag update fails - Issue #927
# NOTE: the below has not been tested/debugged, since it fails at the first step
def test_update(self):
""" Test that a tag can be updated """
# Update the tag using the OpenPhoto class, passing in the tag object
owner = "test1@openphoto.me"
ret_val = self.client.tag.update(self.tags[0], owner=owner)
# Check that the tag is updated
self.tags = self.client.tags.list()
self.assertEqual(self.tags[0].owner, owner)
self.assertEqual(ret_val.owner, owner)
# Update the tag using the OpenPhoto class, passing in the tag id
owner = "test2@openphoto.me"
ret_val = self.client.tag.update(self.TEST_TAG, owner=owner)
# Check that the tag is updated
self.tags = self.client.tags.list()
self.assertEqual(self.tags[0].owner, owner)
self.assertEqual(ret_val.owner, owner)
# Update the tag using the Tag object directly
owner = "test3@openphoto.me"
ret_val = self.tags[0].update(owner=owner)
# Check that the tag is updated
self.tags = self.client.tags.list()
self.assertEqual(self.tags[0].owner, owner)
self.assertEqual(ret_val.owner, owner)
@unittest.expectedFailure # Tag create fails - Issue #927
# NOTE: the below has not been tested/debugged, since it fails at the first step
def test_tag_with_spaces(self):
""" Run test_create_delete using a tag containing spaces """
self.test_create_delete("tag with spaces")
# We mustn't run this test until Issue #919 is resolved,
# since it creates an undeletable tag
@unittest.skip("Tags with double-slashes cannot be deleted - Issue #919")
def test_tag_with_double_slashes(self):
""" Run test_create_delete using a tag containing slashes """
self.test_create_delete("tag/with//slashes")