PyLint fixes
Moved credentials into new Config class
This commit is contained in:
parent
b124b48a75
commit
48e29f24a9
16 changed files with 277 additions and 218 deletions
|
@ -1,4 +1,3 @@
|
|||
from openphoto.errors import *
|
||||
from openphoto.objects import Album
|
||||
|
||||
class ApiAlbums:
|
||||
|
@ -16,7 +15,8 @@ class ApiAlbum:
|
|||
|
||||
def create(self, name, **kwds):
|
||||
""" Create a new album and return it"""
|
||||
result = self._client.post("/album/create.json", name=name, **kwds)["result"]
|
||||
result = self._client.post("/album/create.json",
|
||||
name=name, **kwds)["result"]
|
||||
return Album(self._client, result)
|
||||
|
||||
def delete(self, album, **kwds):
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import base64
|
||||
|
||||
from openphoto.errors import *
|
||||
from openphoto.errors import OpenPhotoError
|
||||
import openphoto.openphoto_http
|
||||
from openphoto.objects import Photo
|
||||
|
||||
class ApiPhotos:
|
||||
|
@ -10,7 +11,7 @@ class ApiPhotos:
|
|||
def list(self, **kwds):
|
||||
""" Returns a list of Photo objects """
|
||||
photos = self._client.get("/photos/list.json", **kwds)["result"]
|
||||
photos = self._client._result_to_list(photos)
|
||||
photos = openphoto.openphoto_http.result_to_list(photos)
|
||||
return [Photo(self._client, photo) for photo in photos]
|
||||
|
||||
def update(self, photos, **kwds):
|
||||
|
@ -19,7 +20,8 @@ class ApiPhotos:
|
|||
Returns True if successful.
|
||||
Raises OpenPhotoError if not.
|
||||
"""
|
||||
if not self._client.post("/photos/update.json", ids=photos, **kwds)["result"]:
|
||||
if not self._client.post("/photos/update.json", ids=photos,
|
||||
**kwds)["result"]:
|
||||
raise OpenPhotoError("Update response returned False")
|
||||
return True
|
||||
|
||||
|
@ -29,7 +31,8 @@ class ApiPhotos:
|
|||
Returns True if successful.
|
||||
Raises OpenPhotoError if not.
|
||||
"""
|
||||
if not self._client.post("/photos/delete.json", ids=photos, **kwds)["result"]:
|
||||
if not self._client.post("/photos/delete.json", ids=photos,
|
||||
**kwds)["result"]:
|
||||
raise OpenPhotoError("Delete response returned False")
|
||||
return True
|
||||
|
||||
|
@ -80,16 +83,17 @@ class ApiPhoto:
|
|||
return photo
|
||||
|
||||
def upload(self, photo_file, **kwds):
|
||||
with open(photo_file, 'rb') as f:
|
||||
""" Uploads the specified file to the server """
|
||||
with open(photo_file, 'rb') as in_file:
|
||||
result = self._client.post("/photo/upload.json",
|
||||
files={'photo': f},
|
||||
files={'photo': in_file},
|
||||
**kwds)["result"]
|
||||
return Photo(self._client, result)
|
||||
|
||||
def upload_encoded(self, photo_file, **kwds):
|
||||
""" Base64-encodes and uploads the specified file """
|
||||
with open(photo_file, "rb") as f:
|
||||
encoded_photo = base64.b64encode(f.read())
|
||||
with open(photo_file, "rb") as in_file:
|
||||
encoded_photo = base64.b64encode(in_file.read())
|
||||
result = self._client.post("/photo/upload.json", photo=encoded_photo,
|
||||
**kwds)["result"]
|
||||
return Photo(self._client, result)
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
from openphoto.errors import *
|
||||
from openphoto.objects import Tag
|
||||
|
||||
class ApiTags:
|
||||
|
@ -15,7 +14,10 @@ class ApiTag:
|
|||
self._client = client
|
||||
|
||||
def create(self, tag, **kwds):
|
||||
""" Create a new tag. The API returns true if the tag was sucessfully created """
|
||||
"""
|
||||
Create a new tag.
|
||||
The API returns true if the tag was sucessfully created
|
||||
"""
|
||||
return self._client.post("/tag/create.json", tag=tag, **kwds)["result"]
|
||||
|
||||
def delete(self, tag, **kwds):
|
||||
|
|
|
@ -9,7 +9,33 @@ try:
|
|||
except ImportError:
|
||||
import StringIO as io # Python2
|
||||
|
||||
class Config:
|
||||
def __init__(self, config_file, host,
|
||||
consumer_key, consumer_secret,
|
||||
token, token_secret):
|
||||
if host is None:
|
||||
self.config_path = get_config_path(config_file)
|
||||
config = read_config(self.config_path)
|
||||
self.host = config['host']
|
||||
self.consumer_key = config['consumerKey']
|
||||
self.consumer_secret = config['consumerSecret']
|
||||
self.token = config['token']
|
||||
self.token_secret = config['tokenSecret']
|
||||
else:
|
||||
self.config_path = None
|
||||
self.host = host
|
||||
self.consumer_key = consumer_key
|
||||
self.consumer_secret = consumer_secret
|
||||
self.token = token
|
||||
self.token_secret = token_secret
|
||||
|
||||
if host is not None and config_file is not None:
|
||||
raise ValueError("Cannot specify both host and config_file")
|
||||
|
||||
def get_config_path(config_file):
|
||||
"""
|
||||
Given the name of a config file, returns the full path
|
||||
"""
|
||||
config_path = os.getenv('XDG_CONFIG_HOME')
|
||||
if not config_path:
|
||||
config_path = os.path.join(os.getenv('HOME'), ".config")
|
||||
|
@ -27,11 +53,12 @@ def read_config(config_path):
|
|||
'consumerKey': '', 'consumerSecret': '',
|
||||
'token': '', 'tokenSecret':'',
|
||||
}
|
||||
# Insert an section header at the start of the config file, so ConfigParser can understand it
|
||||
# Insert an section header at the start of the config file,
|
||||
# so ConfigParser can understand it
|
||||
buf = io.StringIO()
|
||||
buf.write('[%s]\n' % section)
|
||||
with io.open(config_path, "r") as f:
|
||||
buf.write(f.read())
|
||||
with io.open(config_path, "r") as conf:
|
||||
buf.write(conf.read())
|
||||
|
||||
buf.seek(0, os.SEEK_SET)
|
||||
parser = ConfigParser()
|
||||
|
@ -43,8 +70,10 @@ def read_config(config_path):
|
|||
|
||||
# Trim quotes
|
||||
config = parser.items(section)
|
||||
config = [(item[0].replace('"', ''), item[1].replace('"', '')) for item in config]
|
||||
config = [(item[0].replace("'", ""), item[1].replace("'", "")) for item in config]
|
||||
config = [(item[0].replace('"', ''), item[1].replace('"', ''))
|
||||
for item in config]
|
||||
config = [(item[0].replace("'", ""), item[1].replace("'", ""))
|
||||
for item in config]
|
||||
config = dict(config)
|
||||
|
||||
# Apply defaults
|
|
@ -7,5 +7,8 @@ class OpenPhotoDuplicateError(OpenPhotoError):
|
|||
pass
|
||||
|
||||
class OpenPhoto404Error(Exception):
|
||||
""" Indicates that an Http 404 error code was received (resource not found) """
|
||||
"""
|
||||
Indicates that an Http 404 error code was received
|
||||
(resource not found)
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -1,33 +1,51 @@
|
|||
#!/usr/bin/env python
|
||||
import os
|
||||
import sys
|
||||
import string
|
||||
import json
|
||||
from optparse import OptionParser
|
||||
|
||||
from openphoto import OpenPhoto
|
||||
|
||||
CONFIG_ERROR = """
|
||||
You must create a configuration file with the following contents:
|
||||
host = your.host.com
|
||||
consumerKey = your_consumer_key
|
||||
consumerSecret = your_consumer_secret
|
||||
token = your_access_token
|
||||
tokenSecret = your_access_token_secret
|
||||
|
||||
To get your credentials:
|
||||
* Log into your Trovebox site
|
||||
* Click the arrow on the top-right and select 'Settings'.
|
||||
* Click the 'Create a new app' button.
|
||||
* Click the 'View' link beside the newly created app.
|
||||
"""
|
||||
|
||||
#################################################################
|
||||
|
||||
def main(args=sys.argv[1:]):
|
||||
usage = "%prog --help"
|
||||
parser = OptionParser(usage, add_help_option=False)
|
||||
parser.add_option('-c', '--config', action='store', type='string', dest='config_file',
|
||||
help="Configuration file to use")
|
||||
parser.add_option('-h', '-H', '--host', action='store', type='string', dest='host',
|
||||
help="Hostname of the OpenPhoto server (overrides config_file)")
|
||||
parser.add_option('-X', action='store', type='choice', dest='method', choices=('GET', 'POST'),
|
||||
help="Method to use (GET or POST)", default="GET")
|
||||
parser.add_option('-F', action='append', type='string', dest='fields',
|
||||
help="Fields")
|
||||
parser.add_option('-e', action='store', type='string', dest='endpoint',
|
||||
default='/photos/list.json',
|
||||
help="Endpoint to call")
|
||||
parser.add_option('-p', action="store_true", dest="pretty", default=False,
|
||||
help="Pretty print the json")
|
||||
parser.add_option('-v', action="store_true", dest="verbose", default=False,
|
||||
help="Verbose output")
|
||||
parser.add_option('--help', action="store_true", help='show this help message')
|
||||
parser.add_option('-c', '--config', help="Configuration file to use",
|
||||
action='store', type='string', dest='config_file')
|
||||
parser.add_option('-h', '-H', '--host',
|
||||
help=("Hostname of the OpenPhoto server "
|
||||
"(overrides config_file)"),
|
||||
action='store', type='string', dest='host')
|
||||
parser.add_option('-X', help="Method to use (GET or POST)",
|
||||
action='store', type='choice', dest='method',
|
||||
choices=('GET', 'POST'), default="GET")
|
||||
parser.add_option('-F', help="Endpoint field",
|
||||
action='append', type='string', dest='fields')
|
||||
parser.add_option('-e', help="Endpoint to call",
|
||||
action='store', type='string', dest='endpoint',
|
||||
default='/photos/list.json')
|
||||
parser.add_option('-p', help="Pretty print the json",
|
||||
action="store_true", dest="pretty", default=False)
|
||||
parser.add_option('-v', help="Verbose output",
|
||||
action="store_true", dest="verbose", default=False)
|
||||
parser.add_option('--help', help='show this help message',
|
||||
action="store_true")
|
||||
|
||||
options, args = parser.parse_args(args)
|
||||
|
||||
|
@ -41,7 +59,7 @@ def main(args=sys.argv[1:]):
|
|||
params = {}
|
||||
if options.fields:
|
||||
for field in options.fields:
|
||||
(key, value) = string.split(field, '=')
|
||||
(key, value) = field.split('=')
|
||||
params[key] = value
|
||||
|
||||
# Host option overrides config file settings
|
||||
|
@ -52,39 +70,30 @@ def main(args=sys.argv[1:]):
|
|||
client = OpenPhoto(config_file=options.config_file)
|
||||
except IOError as error:
|
||||
print(error)
|
||||
print()
|
||||
print("You must create a configuration file with the following contents:")
|
||||
print(" host = your.host.com")
|
||||
print(" consumerKey = your_consumer_key")
|
||||
print(" consumerSecret = your_consumer_secret")
|
||||
print(" token = your_access_token")
|
||||
print(" tokenSecret = your_access_token_secret")
|
||||
print()
|
||||
print("To get your credentials:")
|
||||
print(" * Log into your Trovebox site")
|
||||
print(" * Click the arrow on the top-right and select 'Settings'.")
|
||||
print(" * Click the 'Create a new app' button.")
|
||||
print(" * Click the 'View' link beside the newly created app.")
|
||||
print()
|
||||
print(CONFIG_ERROR)
|
||||
print(error)
|
||||
sys.exit(1)
|
||||
|
||||
if options.method == "GET":
|
||||
result = client.get(options.endpoint, process_response=False, **params)
|
||||
result = client.get(options.endpoint, process_response=False,
|
||||
**params)
|
||||
else:
|
||||
params, files = extract_files(params)
|
||||
result = client.post(options.endpoint, process_response=False, files=files, **params)
|
||||
result = client.post(options.endpoint, process_response=False,
|
||||
files=files, **params)
|
||||
|
||||
if options.verbose:
|
||||
print("==========\nMethod: %s\nHost: %s\nEndpoint: %s" % (options.method, config['host'], options.endpoint))
|
||||
print("==========\nMethod: %s\nHost: %s\nEndpoint: %s" %
|
||||
(options.method, config['host'], options.endpoint))
|
||||
if len( params ) > 0:
|
||||
print("Fields:")
|
||||
for kv in params.items():
|
||||
print(" %s=%s" % kv)
|
||||
for key, value in params.items():
|
||||
print(" %s=%s" % (key, value))
|
||||
print("==========\n")
|
||||
|
||||
if options.pretty:
|
||||
print(json.dumps(json.loads(result), sort_keys=True, indent=4, separators=(',',':')))
|
||||
print(json.dumps(json.loads(result), sort_keys=True,
|
||||
indent=4, separators=(',',':')))
|
||||
else:
|
||||
print(result)
|
||||
|
||||
|
@ -100,7 +109,8 @@ def extract_files(params):
|
|||
files = {}
|
||||
updated_params = {}
|
||||
for name in params:
|
||||
if name == "photo" and params[name].startswith("@") and os.path.isfile(os.path.expanduser(params[name][1:])):
|
||||
if (name == "photo" and params[name].startswith("@") and
|
||||
os.path.isfile(os.path.expanduser(params[name][1:]))):
|
||||
files[name] = open(params[name][1:], 'rb')
|
||||
else:
|
||||
updated_params[name] = params[name]
|
||||
|
|
|
@ -2,11 +2,12 @@ try:
|
|||
from urllib.parse import quote # Python3
|
||||
except ImportError:
|
||||
from urllib import quote # Python2
|
||||
from .errors import *
|
||||
|
||||
class OpenPhotoObject:
|
||||
""" Base object supporting the storage of custom fields as attributes """
|
||||
def __init__(self, openphoto, json_dict):
|
||||
self.id = None
|
||||
self.name = None
|
||||
self._openphoto = openphoto
|
||||
self._json_dict = json_dict
|
||||
self._set_fields(json_dict)
|
||||
|
@ -29,9 +30,9 @@ class OpenPhotoObject:
|
|||
self._set_fields(json_dict)
|
||||
|
||||
def __repr__(self):
|
||||
if hasattr(self, "name"):
|
||||
if self.name is not None:
|
||||
return "<%s name='%s'>" % (self.__class__, self.name)
|
||||
elif hasattr(self, "id"):
|
||||
elif self.id is not None:
|
||||
return "<%s id='%s'>" % (self.__class__, self.id)
|
||||
else:
|
||||
return "<%s>" % (self.__class__)
|
||||
|
@ -48,14 +49,15 @@ class Photo(OpenPhotoObject):
|
|||
Returns True if successful.
|
||||
Raises an OpenPhotoError if not.
|
||||
"""
|
||||
result = self._openphoto.post("/photo/%s/delete.json" % self.id, **kwds)["result"]
|
||||
result = self._openphoto.post("/photo/%s/delete.json" %
|
||||
self.id, **kwds)["result"]
|
||||
self._replace_fields({})
|
||||
return result
|
||||
|
||||
def edit(self, **kwds):
|
||||
""" Returns an HTML form to edit the photo """
|
||||
result = self._openphoto.get("/photo/%s/edit.json" % self.id,
|
||||
**kwds)["result"]
|
||||
result = self._openphoto.get("/photo/%s/edit.json" %
|
||||
self.id, **kwds)["result"]
|
||||
return result["markup"]
|
||||
|
||||
def replace(self, photo_file, **kwds):
|
||||
|
@ -66,8 +68,8 @@ class Photo(OpenPhotoObject):
|
|||
|
||||
def update(self, **kwds):
|
||||
""" Update this photo with the specified parameters """
|
||||
new_dict = self._openphoto.post("/photo/%s/update.json" % self.id,
|
||||
**kwds)["result"]
|
||||
new_dict = self._openphoto.post("/photo/%s/update.json" %
|
||||
self.id, **kwds)["result"]
|
||||
self._replace_fields(new_dict)
|
||||
|
||||
def view(self, **kwds):
|
||||
|
@ -75,8 +77,8 @@ class Photo(OpenPhotoObject):
|
|||
Used to view the photo at a particular size.
|
||||
Updates the photo's fields with the response.
|
||||
"""
|
||||
new_dict = self._openphoto.get("/photo/%s/view.json" % self.id,
|
||||
**kwds)["result"]
|
||||
new_dict = self._openphoto.get("/photo/%s/view.json" %
|
||||
self.id, **kwds)["result"]
|
||||
self._replace_fields(new_dict)
|
||||
|
||||
def dynamic_url(self, **kwds):
|
||||
|
@ -87,8 +89,8 @@ class Photo(OpenPhotoObject):
|
|||
Returns a dict containing the next and previous photo lists
|
||||
(there may be more than one next/previous photo returned).
|
||||
"""
|
||||
result = self._openphoto.get("/photo/%s/nextprevious.json" % self.id,
|
||||
**kwds)["result"]
|
||||
result = self._openphoto.get("/photo/%s/nextprevious.json" %
|
||||
self.id, **kwds)["result"]
|
||||
value = {}
|
||||
if "next" in result:
|
||||
# Workaround for APIv1
|
||||
|
@ -115,12 +117,13 @@ class Photo(OpenPhotoObject):
|
|||
Performs transformation specified in **kwds
|
||||
Example: transform(rotate=90)
|
||||
"""
|
||||
new_dict = self._openphoto.post("/photo/%s/transform.json" % self.id,
|
||||
**kwds)["result"]
|
||||
new_dict = self._openphoto.post("/photo/%s/transform.json" %
|
||||
self.id, **kwds)["result"]
|
||||
|
||||
# APIv1 doesn't return the transformed photo (frontend issue #955)
|
||||
if isinstance(new_dict, bool):
|
||||
new_dict = self._openphoto.get("/photo/%s/view.json" % self.id)["result"]
|
||||
new_dict = self._openphoto.get("/photo/%s/view.json" %
|
||||
self.id)["result"]
|
||||
|
||||
self._replace_fields(new_dict)
|
||||
|
||||
|
@ -131,7 +134,8 @@ class Tag(OpenPhotoObject):
|
|||
Returns True if successful.
|
||||
Raises an OpenPhotoError if not.
|
||||
"""
|
||||
result = self._openphoto.post("/tag/%s/delete.json" % quote(self.id), **kwds)["result"]
|
||||
result = self._openphoto.post("/tag/%s/delete.json" %
|
||||
quote(self.id), **kwds)["result"]
|
||||
self._replace_fields({})
|
||||
return result
|
||||
|
||||
|
@ -145,15 +149,17 @@ class Tag(OpenPhotoObject):
|
|||
class Album(OpenPhotoObject):
|
||||
def __init__(self, openphoto, json_dict):
|
||||
OpenPhotoObject.__init__(self, openphoto, json_dict)
|
||||
self.photos = None
|
||||
self.cover = None
|
||||
self._update_fields_with_objects()
|
||||
|
||||
def _update_fields_with_objects(self):
|
||||
""" Convert dict fields into objects, where appropriate """
|
||||
# Update the cover with a photo object
|
||||
if hasattr(self, "cover") and isinstance(self.cover, dict):
|
||||
if isinstance(self.cover, dict):
|
||||
self.cover = Photo(self._openphoto, self.cover)
|
||||
# Update the photo list with photo objects
|
||||
if hasattr(self, "photos") and isinstance(self.photos, list):
|
||||
if isinstance(self.photos, list):
|
||||
for i, photo in enumerate(self.photos):
|
||||
if isinstance(photo, dict):
|
||||
self.photos[i] = Photo(self._openphoto, photo)
|
||||
|
@ -164,7 +170,8 @@ class Album(OpenPhotoObject):
|
|||
Returns True if successful.
|
||||
Raises an OpenPhotoError if not.
|
||||
"""
|
||||
result = self._openphoto.post("/album/%s/delete.json" % self.id, **kwds)["result"]
|
||||
result = self._openphoto.post("/album/%s/delete.json" %
|
||||
self.id, **kwds)["result"]
|
||||
self._replace_fields({})
|
||||
return result
|
||||
|
||||
|
@ -179,12 +186,13 @@ class Album(OpenPhotoObject):
|
|||
|
||||
def update(self, **kwds):
|
||||
""" Update this album with the specified parameters """
|
||||
new_dict = self._openphoto.post("/album/%s/update.json" % self.id,
|
||||
**kwds)["result"]
|
||||
new_dict = self._openphoto.post("/album/%s/update.json" %
|
||||
self.id, **kwds)["result"]
|
||||
|
||||
# APIv1 doesn't return the updated album (frontend issue #937)
|
||||
if isinstance(new_dict, bool):
|
||||
new_dict = self._openphoto.get("/album/%s/view.json" % self.id)["result"]
|
||||
new_dict = self._openphoto.get("/album/%s/view.json" %
|
||||
self.id)["result"]
|
||||
|
||||
self._replace_fields(new_dict)
|
||||
self._update_fields_with_objects()
|
||||
|
@ -194,7 +202,7 @@ class Album(OpenPhotoObject):
|
|||
Requests the full contents of the album.
|
||||
Updates the album's fields with the response.
|
||||
"""
|
||||
result = self._openphoto.get("/album/%s/view.json" % self.id,
|
||||
**kwds)["result"]
|
||||
result = self._openphoto.get("/album/%s/view.json" %
|
||||
self.id, **kwds)["result"]
|
||||
self._replace_fields(result)
|
||||
self._update_fields_with_objects()
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from __future__ import unicode_literals
|
||||
import sys
|
||||
import os
|
||||
import requests
|
||||
import requests_oauthlib
|
||||
import logging
|
||||
|
@ -11,16 +10,16 @@ except ImportError:
|
|||
|
||||
from openphoto.objects import OpenPhotoObject
|
||||
from openphoto.errors import *
|
||||
import openphoto.config_files
|
||||
from openphoto.config import Config
|
||||
|
||||
if sys.version < '3':
|
||||
text_type = unicode
|
||||
TEXT_TYPE = unicode
|
||||
# requests_oauth needs to decode to ascii for Python2
|
||||
_oauth_decoding = "utf-8"
|
||||
OAUTH_DECODING = "utf-8"
|
||||
else:
|
||||
text_type = str
|
||||
TEXT_TYPE = str
|
||||
# requests_oauth needs to use (unicode) strings for Python3
|
||||
_oauth_decoding = None
|
||||
OAUTH_DECODING = None
|
||||
|
||||
DUPLICATE_RESPONSE = {"code": 409,
|
||||
"message": "This photo already exists"}
|
||||
|
@ -44,23 +43,11 @@ class OpenPhotoHttp:
|
|||
|
||||
self._logger = logging.getLogger("openphoto")
|
||||
|
||||
if host is None:
|
||||
self._config_path = openphoto.config_files.get_config_path(config_file)
|
||||
config = openphoto.config_files.read_config(self._config_path)
|
||||
self._host = config['host']
|
||||
self._consumer_key = config['consumerKey']
|
||||
self._consumer_secret = config['consumerSecret']
|
||||
self._token = config['token']
|
||||
self._token_secret = config['tokenSecret']
|
||||
else:
|
||||
self._host = host
|
||||
self._consumer_key = consumer_key
|
||||
self._consumer_secret = consumer_secret
|
||||
self._token = token
|
||||
self._token_secret = token_secret
|
||||
self.config = Config(config_file, host,
|
||||
consumer_key, consumer_secret,
|
||||
token, token_secret)
|
||||
|
||||
if host is not None and config_file is not None:
|
||||
raise ValueError("Cannot specify both host and config_file")
|
||||
self.host = self.config.host
|
||||
|
||||
# Remember the most recent HTTP request and response
|
||||
self.last_url = None
|
||||
|
@ -83,17 +70,19 @@ class OpenPhotoHttp:
|
|||
endpoint = "/" + endpoint
|
||||
if self._api_version is not None:
|
||||
endpoint = "/v%d%s" % (self._api_version, endpoint)
|
||||
url = urlunparse(('http', self._host, endpoint, '', '', ''))
|
||||
url = urlunparse(('http', self.host, endpoint, '', '', ''))
|
||||
|
||||
if self._consumer_key:
|
||||
auth = requests_oauthlib.OAuth1(self._consumer_key, self._consumer_secret,
|
||||
self._token, self._token_secret,
|
||||
decoding=_oauth_decoding)
|
||||
if self.config.consumer_key:
|
||||
auth = requests_oauthlib.OAuth1(self.config.consumer_key,
|
||||
self.config.consumer_secret,
|
||||
self.config.token,
|
||||
self.config.token_secret,
|
||||
decoding=OAUTH_DECODING)
|
||||
else:
|
||||
auth = None
|
||||
|
||||
with requests.Session() as s:
|
||||
response = s.get(url, params=params, auth=auth)
|
||||
with requests.Session() as session:
|
||||
response = session.get(url, params=params, auth=auth)
|
||||
|
||||
self._logger.info("============================")
|
||||
self._logger.info("GET %s" % url)
|
||||
|
@ -109,7 +98,7 @@ class OpenPhotoHttp:
|
|||
else:
|
||||
return response.text
|
||||
|
||||
def post(self, endpoint, process_response=True, files = {}, **params):
|
||||
def post(self, endpoint, process_response=True, files=None, **params):
|
||||
"""
|
||||
Performs an HTTP POST to the specified endpoint (API path),
|
||||
passing parameters if given.
|
||||
|
@ -125,22 +114,26 @@ class OpenPhotoHttp:
|
|||
endpoint = "/" + endpoint
|
||||
if self._api_version is not None:
|
||||
endpoint = "/v%d%s" % (self._api_version, endpoint)
|
||||
url = urlunparse(('http', self._host, endpoint, '', '', ''))
|
||||
url = urlunparse(('http', self.host, endpoint, '', '', ''))
|
||||
|
||||
if not self._consumer_key:
|
||||
if not self.config.consumer_key:
|
||||
raise OpenPhotoError("Cannot issue POST without OAuth tokens")
|
||||
|
||||
auth = requests_oauthlib.OAuth1(self._consumer_key, self._consumer_secret,
|
||||
self._token, self._token_secret,
|
||||
decoding=_oauth_decoding)
|
||||
with requests.Session() as s:
|
||||
auth = requests_oauthlib.OAuth1(self.config.consumer_key,
|
||||
self.config.consumer_secret,
|
||||
self.config.token,
|
||||
self.config.token_secret,
|
||||
decoding=OAUTH_DECODING)
|
||||
with requests.Session() as session:
|
||||
if files:
|
||||
# Need to pass parameters as URL query, so they get OAuth signed
|
||||
response = s.post(url, params=params, files=files, auth=auth)
|
||||
response = session.post(url, params=params,
|
||||
files=files, auth=auth)
|
||||
else:
|
||||
# Passing parameters as URL query doesn't work if there are no files to send.
|
||||
# Passing parameters as URL query doesn't work
|
||||
# if there are no files to send.
|
||||
# Send them as form data instead.
|
||||
response = s.post(url, data=params, auth=auth)
|
||||
response = session.post(url, data=params, auth=auth)
|
||||
|
||||
self._logger.info("============================")
|
||||
self._logger.info("POST %s" % url)
|
||||
|
@ -169,7 +162,7 @@ class OpenPhotoHttp:
|
|||
value = value.id
|
||||
|
||||
# Ensure value is UTF-8 encoded
|
||||
if isinstance(value, text_type):
|
||||
if isinstance(value, TEXT_TYPE):
|
||||
value = value.encode("utf-8")
|
||||
|
||||
# Handle lists
|
||||
|
@ -206,9 +199,11 @@ class OpenPhotoHttp:
|
|||
# Status code was valid, so just reraise the exception
|
||||
raise
|
||||
elif response.status_code == 404:
|
||||
raise OpenPhoto404Error("HTTP Error %d: %s" % (response.status_code, response.reason))
|
||||
raise OpenPhoto404Error("HTTP Error %d: %s" %
|
||||
(response.status_code, response.reason))
|
||||
else:
|
||||
raise OpenPhotoError("HTTP Error %d: %s" % (response.status_code, response.reason))
|
||||
raise OpenPhotoError("HTTP Error %d: %s" %
|
||||
(response.status_code, response.reason))
|
||||
|
||||
if 200 <= code < 300:
|
||||
return json_response
|
||||
|
@ -218,12 +213,11 @@ class OpenPhotoHttp:
|
|||
else:
|
||||
raise OpenPhotoError("Code %d: %s" % (code, message))
|
||||
|
||||
@staticmethod
|
||||
def _result_to_list(result):
|
||||
""" Handle the case where the result contains no items """
|
||||
if not result:
|
||||
return []
|
||||
if result[0]["totalRows"] == 0:
|
||||
return []
|
||||
else:
|
||||
return result
|
||||
def result_to_list(result):
|
||||
""" Handle the case where the result contains no items """
|
||||
if not result:
|
||||
return []
|
||||
if result[0]["totalRows"] == 0:
|
||||
return []
|
||||
else:
|
||||
return result
|
||||
|
|
|
@ -1,7 +1,3 @@
|
|||
try:
|
||||
import unittest2 as unittest
|
||||
except ImportError:
|
||||
import unittest
|
||||
from tests import test_albums, test_photos, test_tags
|
||||
|
||||
class TestAlbumsV1(test_albums.TestAlbums):
|
||||
|
|
|
@ -4,14 +4,17 @@ except ImportError:
|
|||
import unittest
|
||||
from tests import test_base, test_albums, test_photos, test_tags
|
||||
|
||||
@unittest.skipIf(test_base.get_test_server_api() < 2, "Don't test future API versions")
|
||||
@unittest.skipIf(test_base.get_test_server_api() < 2,
|
||||
"Don't test future API versions")
|
||||
class TestAlbumsV2(test_albums.TestAlbums):
|
||||
api_version = 2
|
||||
|
||||
@unittest.skipIf(test_base.get_test_server_api() < 2, "Don't test future API versions")
|
||||
@unittest.skipIf(test_base.get_test_server_api() < 2,
|
||||
"Don't test future API versions")
|
||||
class TestPhotosV2(test_photos.TestPhotos):
|
||||
api_version = 2
|
||||
|
||||
@unittest.skipIf(test_base.get_test_server_api() < 2, "Don't test future API versions")
|
||||
@unittest.skipIf(test_base.get_test_server_api() < 2,
|
||||
"Don't test future API versions")
|
||||
class TestTagsV2(test_tags.TestTags):
|
||||
api_version = 2
|
||||
|
|
|
@ -1,9 +1,3 @@
|
|||
try:
|
||||
import unittest2 as unittest # Python2.6
|
||||
except ImportError:
|
||||
import unittest
|
||||
|
||||
import openphoto
|
||||
import tests.test_base
|
||||
|
||||
class TestAlbums(tests.test_base.TestBase):
|
||||
|
@ -17,22 +11,26 @@ class TestAlbums(tests.test_base.TestBase):
|
|||
# Check the return value
|
||||
self.assertEqual(album.name, album_name)
|
||||
# Check that the album now exists
|
||||
self.assertIn(album_name, [a.name for a in self.client.albums.list()])
|
||||
self.assertIn(album_name,
|
||||
[a.name for a in self.client.albums.list()])
|
||||
|
||||
# Delete the album
|
||||
self.assertTrue(self.client.album.delete(album.id))
|
||||
# Check that the album is now gone
|
||||
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
|
||||
self.assertNotIn(album_name,
|
||||
[a.name for a in self.client.albums.list()])
|
||||
|
||||
# Create it again, and delete it using the Album object
|
||||
album = self.client.album.create(album_name)
|
||||
self.assertTrue(album.delete())
|
||||
# Check that the album is now gone
|
||||
self.assertNotIn(album_name, [a.name for a in self.client.albums.list()])
|
||||
self.assertNotIn(album_name,
|
||||
[a.name for a in self.client.albums.list()])
|
||||
|
||||
def test_update(self):
|
||||
""" Test that an album can be updated """
|
||||
# Update the album using the OpenPhoto class, passing in the album object
|
||||
# Update the album using the OpenPhoto class,
|
||||
# passing in the album object
|
||||
new_name = "New Name"
|
||||
self.client.album.update(self.albums[0], name=new_name)
|
||||
|
||||
|
@ -58,7 +56,6 @@ class TestAlbums(tests.test_base.TestBase):
|
|||
def test_view(self):
|
||||
""" Test the album view """
|
||||
album = self.albums[0]
|
||||
self.assertFalse(hasattr(album, "photos"))
|
||||
|
||||
# Get the photos in the album using the Album object directly
|
||||
album.view(includeElements=True)
|
||||
|
|
|
@ -10,7 +10,8 @@ except ImportError:
|
|||
import openphoto
|
||||
|
||||
def get_test_server_api():
|
||||
return int(os.getenv("OPENPHOTO_TEST_SERVER_API", openphoto.LATEST_API_VERSION))
|
||||
return int(os.getenv("OPENPHOTO_TEST_SERVER_API",
|
||||
openphoto.LATEST_API_VERSION))
|
||||
|
||||
class TestBase(unittest.TestCase):
|
||||
TEST_TITLE = "Test Image - delete me!"
|
||||
|
@ -24,7 +25,7 @@ class TestBase(unittest.TestCase):
|
|||
debug = (os.getenv("OPENPHOTO_TEST_DEBUG", "0") == "1")
|
||||
|
||||
def __init__(self, *args, **kwds):
|
||||
unittest.TestCase.__init__(self, *args, **kwds)
|
||||
super(TestBase, self).__init__(*args, **kwds)
|
||||
self.photos = []
|
||||
|
||||
logging.basicConfig(filename="tests.log",
|
||||
|
@ -47,17 +48,17 @@ class TestBase(unittest.TestCase):
|
|||
if cls.client.photos.list() != []:
|
||||
raise ValueError("The test server (%s) contains photos. "
|
||||
"Please delete them before running the tests"
|
||||
% cls.client._host)
|
||||
% cls.client.host)
|
||||
|
||||
if cls.client.tags.list() != []:
|
||||
raise ValueError("The test server (%s) contains tags. "
|
||||
"Please delete them before running the tests"
|
||||
% cls.client._host)
|
||||
% cls.client.host)
|
||||
|
||||
if cls.client.albums.list() != []:
|
||||
raise ValueError("The test server (%s) contains albums. "
|
||||
"Please delete them before running the tests"
|
||||
% cls.client._host)
|
||||
% cls.client.host)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
|
@ -117,10 +118,10 @@ class TestBase(unittest.TestCase):
|
|||
print("Albums: %s" % self.albums)
|
||||
raise Exception("Album creation failed")
|
||||
|
||||
logging.info("\nRunning %s..." % self.id())
|
||||
logging.info("\nRunning %s...", self.id())
|
||||
|
||||
def tearDown(self):
|
||||
logging.info("Finished %s\n" % self.id())
|
||||
logging.info("Finished %s\n", self.id())
|
||||
|
||||
@classmethod
|
||||
def _create_test_photos(cls):
|
||||
|
@ -143,9 +144,11 @@ class TestBase(unittest.TestCase):
|
|||
|
||||
@classmethod
|
||||
def _delete_all(cls):
|
||||
""" Remove all photos, tags and albums """
|
||||
photos = cls.client.photos.list()
|
||||
if len(photos) > cls.MAXIMUM_TEST_PHOTOS:
|
||||
raise ValueError("There too many photos on the test server - must always be less than %d."
|
||||
raise ValueError("There too many photos on the test server "
|
||||
"- must always be less than %d."
|
||||
% cls.MAXIMUM_TEST_PHOTOS)
|
||||
for photo in photos:
|
||||
photo.delete()
|
||||
|
|
|
@ -5,7 +5,7 @@ try:
|
|||
except ImportError:
|
||||
import unittest
|
||||
|
||||
import openphoto
|
||||
from openphoto import OpenPhoto
|
||||
|
||||
CONFIG_HOME_PATH = os.path.join("tests", "config")
|
||||
CONFIG_PATH = os.path.join(CONFIG_HOME_PATH, "openphoto")
|
||||
|
@ -27,68 +27,73 @@ class TestConfig(unittest.TestCase):
|
|||
os.environ["XDG_CONFIG_HOME"] = self.original_xdg_config_home
|
||||
shutil.rmtree(CONFIG_HOME_PATH, ignore_errors=True)
|
||||
|
||||
def create_config(self, config_file, host):
|
||||
with open(os.path.join(CONFIG_PATH, config_file), "w") as f:
|
||||
f.write("host = %s\n" % host)
|
||||
f.write("# Comment\n\n")
|
||||
f.write("consumerKey = \"%s_consumer_key\"\n" % config_file)
|
||||
f.write("\"consumerSecret\" = %s_consumer_secret\n" % config_file)
|
||||
f.write("'token'=%s_token\n" % config_file)
|
||||
f.write("tokenSecret = '%s_token_secret'\n" % config_file)
|
||||
@staticmethod
|
||||
def create_config(config_file, host):
|
||||
with open(os.path.join(CONFIG_PATH, config_file), "w") as conf:
|
||||
conf.write("host = %s\n" % host)
|
||||
conf.write("# Comment\n\n")
|
||||
conf.write("consumerKey = \"%s_consumer_key\"\n" % config_file)
|
||||
conf.write("\"consumerSecret\" = %s_consumer_secret\n" % config_file)
|
||||
conf.write("'token'=%s_token\n" % config_file)
|
||||
conf.write("tokenSecret = '%s_token_secret'\n" % config_file)
|
||||
|
||||
def test_default_config(self):
|
||||
""" Ensure the default config is loaded """
|
||||
self.create_config("default", "Test Default Host")
|
||||
client = openphoto.OpenPhoto()
|
||||
self.assertEqual(client._host, "Test Default Host")
|
||||
self.assertEqual(client._consumer_key, "default_consumer_key")
|
||||
self.assertEqual(client._consumer_secret, "default_consumer_secret")
|
||||
self.assertEqual(client._token, "default_token")
|
||||
self.assertEqual(client._token_secret, "default_token_secret")
|
||||
client = OpenPhoto()
|
||||
config = client.config
|
||||
self.assertEqual(client.host, "Test Default Host")
|
||||
self.assertEqual(config.consumer_key, "default_consumer_key")
|
||||
self.assertEqual(config.consumer_secret, "default_consumer_secret")
|
||||
self.assertEqual(config.token, "default_token")
|
||||
self.assertEqual(config.token_secret, "default_token_secret")
|
||||
|
||||
def test_custom_config(self):
|
||||
""" Ensure a custom config can be loaded """
|
||||
self.create_config("default", "Test Default Host")
|
||||
self.create_config("custom", "Test Custom Host")
|
||||
client = openphoto.OpenPhoto(config_file="custom")
|
||||
self.assertEqual(client._host, "Test Custom Host")
|
||||
self.assertEqual(client._consumer_key, "custom_consumer_key")
|
||||
self.assertEqual(client._consumer_secret, "custom_consumer_secret")
|
||||
self.assertEqual(client._token, "custom_token")
|
||||
self.assertEqual(client._token_secret, "custom_token_secret")
|
||||
client = OpenPhoto(config_file="custom")
|
||||
config = client.config
|
||||
self.assertEqual(client.host, "Test Custom Host")
|
||||
self.assertEqual(config.consumer_key, "custom_consumer_key")
|
||||
self.assertEqual(config.consumer_secret, "custom_consumer_secret")
|
||||
self.assertEqual(config.token, "custom_token")
|
||||
self.assertEqual(config.token_secret, "custom_token_secret")
|
||||
|
||||
def test_full_config_path(self):
|
||||
""" Ensure a full custom config path can be loaded """
|
||||
self.create_config("path", "Test Path Host")
|
||||
full_path = os.path.abspath(CONFIG_PATH)
|
||||
client = openphoto.OpenPhoto(config_file=os.path.join(full_path, "path"))
|
||||
self.assertEqual(client._host, "Test Path Host")
|
||||
self.assertEqual(client._consumer_key, "path_consumer_key")
|
||||
self.assertEqual(client._consumer_secret, "path_consumer_secret")
|
||||
self.assertEqual(client._token, "path_token")
|
||||
self.assertEqual(client._token_secret, "path_token_secret")
|
||||
client = OpenPhoto(config_file=os.path.join(full_path, "path"))
|
||||
config = client.config
|
||||
self.assertEqual(client.host, "Test Path Host")
|
||||
self.assertEqual(config.consumer_key, "path_consumer_key")
|
||||
self.assertEqual(config.consumer_secret, "path_consumer_secret")
|
||||
self.assertEqual(config.token, "path_token")
|
||||
self.assertEqual(config.token_secret, "path_token_secret")
|
||||
|
||||
def test_host_override(self):
|
||||
""" Ensure that specifying a host overrides the default config """
|
||||
self.create_config("default", "Test Default Host")
|
||||
client = openphoto.OpenPhoto(host="host_override")
|
||||
self.assertEqual(client._host, "host_override")
|
||||
self.assertEqual(client._consumer_key, "")
|
||||
self.assertEqual(client._consumer_secret, "")
|
||||
self.assertEqual(client._token, "")
|
||||
self.assertEqual(client._token_secret, "")
|
||||
client = OpenPhoto(host="host_override")
|
||||
config = client.config
|
||||
self.assertEqual(config.host, "host_override")
|
||||
self.assertEqual(config.consumer_key, "")
|
||||
self.assertEqual(config.consumer_secret, "")
|
||||
self.assertEqual(config.token, "")
|
||||
self.assertEqual(config.token_secret, "")
|
||||
|
||||
def test_missing_config_files_raise_exceptions(self):
|
||||
def test_missing_config_files(self):
|
||||
""" Ensure that missing config files raise exceptions """
|
||||
with self.assertRaises(IOError):
|
||||
openphoto.OpenPhoto()
|
||||
OpenPhoto()
|
||||
with self.assertRaises(IOError):
|
||||
openphoto.OpenPhoto(config_file="custom")
|
||||
OpenPhoto(config_file="custom")
|
||||
|
||||
def test_host_and_config_file_raises_exception(self):
|
||||
def test_host_and_config_file(self):
|
||||
""" It's not valid to specify both a host and a config_file """
|
||||
self.create_config("custom", "Test Custom Host")
|
||||
with self.assertRaises(ValueError):
|
||||
openphoto.OpenPhoto(config_file="custom", host="host_override")
|
||||
OpenPhoto(config_file="custom", host="host_override")
|
||||
|
||||
|
||||
|
|
|
@ -1,8 +1,4 @@
|
|||
import logging
|
||||
try:
|
||||
import unittest2 as unittest # python2.6
|
||||
except ImportError:
|
||||
import unittest
|
||||
|
||||
import openphoto
|
||||
import tests.test_base
|
||||
|
@ -11,28 +7,39 @@ class TestFramework(tests.test_base.TestBase):
|
|||
testcase_name = "framework"
|
||||
|
||||
def setUp(self):
|
||||
"""Override the default setUp, since we don't need a populated database"""
|
||||
logging.info("\nRunning %s..." % self.id())
|
||||
"""
|
||||
Override the default setUp, since we don't need a populated database
|
||||
"""
|
||||
logging.info("\nRunning %s...", self.id())
|
||||
|
||||
def test_api_version_zero(self):
|
||||
# API v0 has a special hello world message
|
||||
"""
|
||||
API v0 has a special hello world message
|
||||
"""
|
||||
client = openphoto.OpenPhoto(config_file=self.config_file,
|
||||
api_version=0)
|
||||
result = client.get("hello.json")
|
||||
self.assertEqual(result['message'], "Hello, world! This is version zero of the API!")
|
||||
self.assertEqual(result['message'],
|
||||
"Hello, world! This is version zero of the API!")
|
||||
self.assertEqual(result['result']['__route__'], "/v0/hello.json")
|
||||
|
||||
def test_specified_api_version(self):
|
||||
# For all API versions >0, we get a generic hello world message
|
||||
"""
|
||||
For all API versions >0, we get a generic hello world message
|
||||
"""
|
||||
for api_version in range(1, tests.test_base.get_test_server_api() + 1):
|
||||
client = openphoto.OpenPhoto(config_file=self.config_file,
|
||||
api_version=api_version)
|
||||
result = client.get("hello.json")
|
||||
self.assertEqual(result['message'], "Hello, world!")
|
||||
self.assertEqual(result['result']['__route__'], "/v%d/hello.json" % api_version)
|
||||
self.assertEqual(result['result']['__route__'],
|
||||
"/v%d/hello.json" % api_version)
|
||||
|
||||
def test_unspecified_api_version(self):
|
||||
# If the API version is unspecified, we get a generic hello world message
|
||||
"""
|
||||
If the API version is unspecified,
|
||||
we get a generic hello world message.
|
||||
"""
|
||||
client = openphoto.OpenPhoto(config_file=self.config_file,
|
||||
api_version=None)
|
||||
result = client.get("hello.json")
|
||||
|
@ -40,9 +47,11 @@ class TestFramework(tests.test_base.TestBase):
|
|||
self.assertEqual(result['result']['__route__'], "/hello.json")
|
||||
|
||||
def test_future_api_version(self):
|
||||
# If the API version is unsupported, we should get an error
|
||||
# (it's a ValueError, since the returned 404 HTML page is not valid JSON)
|
||||
"""
|
||||
If the API version is unsupported, we should get an error
|
||||
(ValueError, since the returned 404 HTML page is not valid JSON)
|
||||
"""
|
||||
client = openphoto.OpenPhoto(config_file=self.config_file,
|
||||
api_version=openphoto.LATEST_API_VERSION + 1)
|
||||
api_version=openphoto.LATEST_API_VERSION + 1)
|
||||
with self.assertRaises(openphoto.OpenPhoto404Error):
|
||||
client.get("hello.json")
|
||||
|
|
|
@ -1,8 +1,4 @@
|
|||
from __future__ import unicode_literals
|
||||
try:
|
||||
import unittest2 as unittest # Python2.6
|
||||
except ImportError:
|
||||
import unittest
|
||||
|
||||
import openphoto
|
||||
import tests.test_base
|
||||
|
|
|
@ -3,7 +3,6 @@ try:
|
|||
except ImportError:
|
||||
import unittest
|
||||
|
||||
import openphoto
|
||||
import tests.test_base
|
||||
|
||||
@unittest.skipIf(tests.test_base.get_test_server_api() == 1,
|
||||
|
@ -47,9 +46,10 @@ class TestTags(tests.test_base.TestBase):
|
|||
# Also remove the tag from the photo
|
||||
self.photos[0].update(tagsRemove=tag_id)
|
||||
|
||||
# TODO: Un-skip and update this tests once there are tag fields that can be updated.
|
||||
# The owner field cannot be updated.
|
||||
@unittest.skip("Can't test the tag.update endpoint, since there are no fields that can be updated")
|
||||
# TODO: Un-skip and update this tests once there are tag fields
|
||||
# that can be updated (the owner field cannot be updated).
|
||||
@unittest.skip("Can't test the tag.update endpoint, "
|
||||
"since there are no fields that can be updated")
|
||||
def test_update(self):
|
||||
""" Test that a tag can be updated """
|
||||
# Update the tag using the OpenPhoto class, passing in the tag object
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue