Merge branch 'sneakypete81-better_config' into development

This commit is contained in:
sneakypete81 2013-08-16 17:54:30 +01:00
commit cb0ea24515
5 changed files with 111 additions and 54 deletions

View file

@ -77,11 +77,16 @@ API Versioning
==============
It may be useful to lock your application to a particular version of the Trovebox API.
This ensures that future API updates won't cause unexpected breakages.
To do this, configure your Trovebox client as follows:
To do this, add the optional ``api_version`` parameter when creating the client object::
client.configure(api_version=2)
from trovebox import Trovebox
client = Trovebox(api_version=2)
SSL Verification
================
If you connect to your Trovebox server over HTTPS, its SSL certificate is automatically verified.
You can configure your Trovebox client to bypass this verification step:
client.configure(ssl_verify=False)
Commandline Tool
================

View file

@ -10,7 +10,7 @@ from trovebox import Trovebox
CONFIG_HOME_PATH = os.path.join("tests", "config")
CONFIG_PATH = os.path.join(CONFIG_HOME_PATH, "trovebox")
class TestConfig(unittest.TestCase):
class TestAuth(unittest.TestCase):
def setUp(self):
""" Override XDG_CONFIG_HOME env var, to use test configs """
try:
@ -42,47 +42,47 @@ class TestConfig(unittest.TestCase):
""" Ensure the default config is loaded """
self.create_config("default", "Test Default Host")
client = Trovebox()
config = client.config
auth = client.auth
self.assertEqual(client.host, "Test Default Host")
self.assertEqual(config.consumer_key, "default_consumer_key")
self.assertEqual(config.consumer_secret, "default_consumer_secret")
self.assertEqual(config.token, "default_token")
self.assertEqual(config.token_secret, "default_token_secret")
self.assertEqual(auth.consumer_key, "default_consumer_key")
self.assertEqual(auth.consumer_secret, "default_consumer_secret")
self.assertEqual(auth.token, "default_token")
self.assertEqual(auth.token_secret, "default_token_secret")
def test_custom_config(self):
""" Ensure a custom config can be loaded """
self.create_config("default", "Test Default Host")
self.create_config("custom", "Test Custom Host")
client = Trovebox(config_file="custom")
config = client.config
auth = client.auth
self.assertEqual(client.host, "Test Custom Host")
self.assertEqual(config.consumer_key, "custom_consumer_key")
self.assertEqual(config.consumer_secret, "custom_consumer_secret")
self.assertEqual(config.token, "custom_token")
self.assertEqual(config.token_secret, "custom_token_secret")
self.assertEqual(auth.consumer_key, "custom_consumer_key")
self.assertEqual(auth.consumer_secret, "custom_consumer_secret")
self.assertEqual(auth.token, "custom_token")
self.assertEqual(auth.token_secret, "custom_token_secret")
def test_full_config_path(self):
""" Ensure a full custom config path can be loaded """
self.create_config("path", "Test Path Host")
full_path = os.path.abspath(CONFIG_PATH)
client = Trovebox(config_file=os.path.join(full_path, "path"))
config = client.config
auth = client.auth
self.assertEqual(client.host, "Test Path Host")
self.assertEqual(config.consumer_key, "path_consumer_key")
self.assertEqual(config.consumer_secret, "path_consumer_secret")
self.assertEqual(config.token, "path_token")
self.assertEqual(config.token_secret, "path_token_secret")
self.assertEqual(auth.consumer_key, "path_consumer_key")
self.assertEqual(auth.consumer_secret, "path_consumer_secret")
self.assertEqual(auth.token, "path_token")
self.assertEqual(auth.token_secret, "path_token_secret")
def test_host_override(self):
""" Ensure that specifying a host overrides the default config """
self.create_config("default", "Test Default Host")
client = Trovebox(host="host_override")
config = client.config
self.assertEqual(config.host, "host_override")
self.assertEqual(config.consumer_key, "")
self.assertEqual(config.consumer_secret, "")
self.assertEqual(config.token, "")
self.assertEqual(config.token_secret, "")
auth = client.auth
self.assertEqual(auth.host, "host_override")
self.assertEqual(auth.consumer_key, "")
self.assertEqual(auth.consumer_secret, "")
self.assertEqual(auth.token, "")
self.assertEqual(auth.token_secret, "")
def test_missing_config_files(self):
""" Ensure that missing config files raise exceptions """

View file

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import os
import json
import mock
import httpretty
try:
import unittest2 as unittest # Python2.6
@ -44,7 +45,7 @@ class TestHttp(unittest.TestCase):
def test_attributes(self):
"""Check that the host attribute has been set correctly"""
self.assertEqual(self.client.host, self.test_host)
self.assertEqual(self.client.config.host, self.test_host)
self.assertEqual(self.client.auth.host, self.test_host)
@httpretty.activate
def test_get_with_http_scheme(self):
@ -219,7 +220,8 @@ class TestHttp(unittest.TestCase):
@httpretty.activate
def test_get_with_api_version(self):
"""Check that an API version can be specified for the get method"""
self.client = trovebox.Trovebox(host=self.test_host, api_version=1)
self.client = trovebox.Trovebox(host=self.test_host)
self.client.configure(api_version=1)
self._register_uri(httpretty.GET,
uri="http://%s/v1/%s" % (self.test_host,
self.test_endpoint))
@ -228,13 +230,39 @@ class TestHttp(unittest.TestCase):
@httpretty.activate
def test_post_with_api_version(self):
"""Check that an API version can be specified for the post method"""
self.client = trovebox.Trovebox(host=self.test_host, api_version=1,
**self.test_oauth)
self.client = trovebox.Trovebox(host=self.test_host, **self.test_oauth)
self.client.configure(api_version=1)
self._register_uri(httpretty.POST,
uri="http://%s/v1/%s" % (self.test_host,
self.test_endpoint))
self.client.post(self.test_endpoint)
@mock.patch.object(trovebox.http.requests, 'Session')
def test_get_with_ssl_verify_disabled(self, mock_session):
"""Check that SSL verification can be disabled for the get method"""
session = mock_session.return_value.__enter__.return_value
session.get.return_value.text = "response text"
session.get.return_value.status_code = 200
session.get.return_value.json.return_value = self.test_data
self.client = trovebox.Trovebox(host=self.test_host, **self.test_oauth)
self.client.configure(ssl_verify=False)
self.client.get(self.test_endpoint)
self.assertEqual(session.verify, False)
@mock.patch.object(trovebox.http.requests, 'Session')
def test_post_with_ssl_verify_disabled(self, mock_session):
"""Check that SSL verification can be disabled for the post method"""
session = mock_session.return_value.__enter__.return_value
session.post.return_value.text = "response text"
session.post.return_value.status_code = 200
session.post.return_value.json.return_value = self.test_data
self.client = trovebox.Trovebox(host=self.test_host, **self.test_oauth)
self.client.configure(ssl_verify=False)
self.client.post(self.test_endpoint)
self.assertEqual(session.verify, False)
@httpretty.activate
def test_post_file(self):
"""Check that a file can be posted"""

View file

@ -1,5 +1,5 @@
"""
config.py : OAuth Config File Parser
auth.py : OAuth Config File Parser
"""
from __future__ import unicode_literals
import os
@ -12,7 +12,7 @@ try:
except ImportError:
import StringIO as io # Python2
class Config(object):
class Auth(object):
def __init__(self, config_file, host,
consumer_key, consumer_secret,
token, token_secret):

View file

@ -13,7 +13,7 @@ except ImportError:
from .objects import TroveboxObject
from .errors import *
from .config import Config
from .auth import Auth
if sys.version < '3':
TEXT_TYPE = unicode
@ -26,33 +26,55 @@ DUPLICATE_RESPONSE = {"code": 409,
class Http(object):
"""
Base class to handle HTTP requests to an Trovebox server.
If no parameters are specified, config is loaded from the default
location (~/.config/trovebox/default).
If no parameters are specified, auth config is loaded from the
default location (~/.config/trovebox/default).
The config_file parameter is used to specify an alternate config file.
If the host parameter is specified, no config file is loaded and
OAuth tokens (consumer*, token*) can optionally be specified.
All requests will include the api_version path, if specified.
This should be used to ensure that your application will continue to work
even if the Trovebox API is updated to a new revision.
"""
_CONFIG_DEFAULTS = {"api_version" : None,
"ssl_verify" : True,
}
def __init__(self, config_file=None, host=None,
consumer_key='', consumer_secret='',
token='', token_secret='', api_version=None):
self._api_version = api_version
self.config = dict(self._CONFIG_DEFAULTS)
if api_version is not None:
print("Deprecation Warning: api_version should be set by "
"calling the configure function")
self.config["api_version"] = api_version
self._logger = logging.getLogger("trovebox")
self.config = Config(config_file, host,
consumer_key, consumer_secret,
token, token_secret)
self.auth = Auth(config_file, host,
consumer_key, consumer_secret,
token, token_secret)
self.host = self.config.host
self.host = self.auth.host
# Remember the most recent HTTP request and response
self.last_url = None
self.last_params = None
self.last_response = None
def configure(self, **kwds):
"""
Update Trovebox HTTP client configuration.
:param api_version: Include a Trovebox API version in all requests.
This can be used to ensure that your application will continue
to work even if the Trovebox API is updated to a new revision.
[default: None]
:param ssl_verify: If true, HTTPS SSL certificates will always be
verified [default: True]
"""
for item in kwds:
self.config[item] = kwds[item]
def get(self, endpoint, process_response=True, **params):
"""
Performs an HTTP GET from the specified endpoint (API path),
@ -67,15 +89,16 @@ class Http(object):
params = self._process_params(params)
url = self._construct_url(endpoint)
if self.config.consumer_key:
auth = requests_oauthlib.OAuth1(self.config.consumer_key,
self.config.consumer_secret,
self.config.token,
self.config.token_secret)
if self.auth.consumer_key:
auth = requests_oauthlib.OAuth1(self.auth.consumer_key,
self.auth.consumer_secret,
self.auth.token,
self.auth.token_secret)
else:
auth = None
with requests.Session() as session:
session.verify = self.config["ssl_verify"]
response = session.get(url, params=params, auth=auth)
self._logger.info("============================")
@ -106,14 +129,15 @@ class Http(object):
params = self._process_params(params)
url = self._construct_url(endpoint)
if not self.config.consumer_key:
if not self.auth.consumer_key:
raise TroveboxError("Cannot issue POST without OAuth tokens")
auth = requests_oauthlib.OAuth1(self.config.consumer_key,
self.config.consumer_secret,
self.config.token,
self.config.token_secret)
auth = requests_oauthlib.OAuth1(self.auth.consumer_key,
self.auth.consumer_secret,
self.auth.token,
self.auth.token_secret)
with requests.Session() as session:
session.verify = self.config["ssl_verify"]
if files:
# Need to pass parameters as URL query, so they get OAuth signed
response = session.post(url, params=params,
@ -153,8 +177,8 @@ class Http(object):
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
if self._api_version is not None:
endpoint = "/v%d%s" % (self._api_version, endpoint)
if self.config["api_version"] is not None:
endpoint = "/v%d%s" % (self.config["api_version"], endpoint)
return urlunparse((scheme, host, endpoint, '', '', ''))
@staticmethod