|
|
5d192ad |
# -*- coding: utf-8 -*-
|
|
|
f759a6b |
import requests
|
|
|
f759a6b |
import unittest
|
|
|
f759a6b |
|
|
|
f759a6b |
from httmock import (all_requests, response, urlmatch, with_httmock, HTTMock,
|
|
|
f759a6b |
remember_called, text_type, binary_type)
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(scheme='swallow')
|
|
|
f759a6b |
def unmatched_scheme(url, request):
|
|
|
f759a6b |
raise AssertionError('This is outrageous')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(path=r'^never$')
|
|
|
f759a6b |
def unmatched_path(url, request):
|
|
|
f759a6b |
raise AssertionError('This is outrageous')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(method='post')
|
|
|
f759a6b |
def unmatched_method(url, request):
|
|
|
f759a6b |
raise AssertionError('This is outrageous')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(netloc=r'(.*\.)?google\.com$', path=r'^/$')
|
|
|
f759a6b |
def google_mock(url, request):
|
|
|
f759a6b |
return 'Hello from Google'
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(netloc=r'(.*\.)?google\.com$', path=r'^/$')
|
|
|
f759a6b |
@remember_called
|
|
|
f759a6b |
def google_mock_count(url, request):
|
|
|
f759a6b |
return 'Hello from Google'
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(scheme='http', netloc=r'(.*\.)?facebook\.com$')
|
|
|
f759a6b |
def facebook_mock(url, request):
|
|
|
f759a6b |
return 'Hello from Facebook'
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(scheme='http', netloc=r'(.*\.)?facebook\.com$')
|
|
|
f759a6b |
@remember_called
|
|
|
f759a6b |
def facebook_mock_count(url, request):
|
|
|
f759a6b |
return 'Hello from Facebook'
|
|
|
f759a6b |
|
|
|
5d192ad |
@urlmatch(netloc=r'(.*\.)?google\.com$', path=r'^/$', method='POST')
|
|
|
5d192ad |
@remember_called
|
|
|
5d192ad |
def google_mock_store_requests(url, request):
|
|
|
5d192ad |
return 'Posting at Google'
|
|
|
5d192ad |
|
|
|
5d192ad |
|
|
|
5d192ad |
@all_requests
|
|
|
5d192ad |
def charset_utf8(url, request):
|
|
|
5d192ad |
return {
|
|
|
5d192ad |
'content': u'Motörhead'.encode('utf-8'),
|
|
|
5d192ad |
'status_code': 200,
|
|
|
5d192ad |
'headers': {
|
|
|
5d192ad |
'Content-Type': 'text/plain; charset=utf-8'
|
|
|
5d192ad |
}
|
|
|
5d192ad |
}
|
|
|
5d192ad |
|
|
|
f759a6b |
|
|
|
f759a6b |
def any_mock(url, request):
|
|
|
f759a6b |
return 'Hello from %s' % (url.netloc,)
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
def dict_any_mock(url, request):
|
|
|
f759a6b |
return {
|
|
|
f759a6b |
'content': 'Hello from %s' % (url.netloc,),
|
|
|
f759a6b |
'status_code': 200
|
|
|
f759a6b |
}
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
def example_400_response(url, response):
|
|
|
f759a6b |
r = requests.Response()
|
|
|
f759a6b |
r.status_code = 400
|
|
|
f759a6b |
r._content = b'Bad request.'
|
|
|
f759a6b |
return r
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class MockTest(unittest.TestCase):
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_return_type(self):
|
|
|
f759a6b |
with HTTMock(any_mock):
|
|
|
f759a6b |
r = requests.get('http://domain.com/')
|
|
|
f759a6b |
self.assertTrue(isinstance(r, requests.Response))
|
|
|
f759a6b |
self.assertTrue(isinstance(r.content, binary_type))
|
|
|
f759a6b |
self.assertTrue(isinstance(r.text, text_type))
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_scheme_fallback(self):
|
|
|
f759a6b |
with HTTMock(unmatched_scheme, any_mock):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from example.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_path_fallback(self):
|
|
|
f759a6b |
with HTTMock(unmatched_path, any_mock):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from example.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_method_fallback(self):
|
|
|
f759a6b |
with HTTMock(unmatched_method, any_mock):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from example.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_netloc_fallback(self):
|
|
|
f759a6b |
with HTTMock(google_mock, facebook_mock):
|
|
|
f759a6b |
r = requests.get('http://google.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from Google')
|
|
|
f759a6b |
with HTTMock(google_mock, facebook_mock):
|
|
|
f759a6b |
r = requests.get('http://facebook.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from Facebook')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_400_response(self):
|
|
|
f759a6b |
with HTTMock(example_400_response):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.status_code, 400)
|
|
|
f759a6b |
self.assertEqual(r.content, b'Bad request.')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_real_request_fallback(self):
|
|
|
f759a6b |
with HTTMock(any_mock):
|
|
|
f759a6b |
with HTTMock(google_mock, facebook_mock):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.status_code, 200)
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from example.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_invalid_intercept_response_raises_value_error(self):
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def response_content(url, request):
|
|
|
f759a6b |
return -1
|
|
|
f759a6b |
with HTTMock(response_content):
|
|
|
f759a6b |
self.assertRaises(TypeError, requests.get, 'http://example.com/')
|
|
|
f759a6b |
|
|
|
5d192ad |
def test_encoding_from_contenttype(self):
|
|
|
5d192ad |
with HTTMock(charset_utf8):
|
|
|
5d192ad |
r = requests.get('http://example.com/')
|
|
|
5d192ad |
self.assertEqual(r.encoding, 'utf-8')
|
|
|
5d192ad |
self.assertEqual(r.text, u'Motörhead')
|
|
|
5d192ad |
self.assertEqual(r.content, r.text.encode('utf-8'))
|
|
|
5d192ad |
|
|
|
f759a6b |
|
|
|
f759a6b |
class DecoratorTest(unittest.TestCase):
|
|
|
f759a6b |
|
|
|
f759a6b |
@with_httmock(any_mock)
|
|
|
f759a6b |
def test_decorator(self):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from example.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
@with_httmock(any_mock)
|
|
|
f759a6b |
def test_iter_lines(self):
|
|
|
f759a6b |
r = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(list(r.iter_lines()),
|
|
|
f759a6b |
[b'Hello from example.com'])
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class AllRequestsDecoratorTest(unittest.TestCase):
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_all_requests_response(self):
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def response_content(url, request):
|
|
|
f759a6b |
return {'status_code': 200, 'content': 'Oh hai'}
|
|
|
f759a6b |
with HTTMock(response_content):
|
|
|
f759a6b |
r = requests.get('https://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.status_code, 200)
|
|
|
f759a6b |
self.assertEqual(r.content, b'Oh hai')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_all_str_response(self):
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def response_content(url, request):
|
|
|
f759a6b |
return 'Hello'
|
|
|
f759a6b |
with HTTMock(response_content):
|
|
|
f759a6b |
r = requests.get('https://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class AllRequestsMethodDecoratorTest(unittest.TestCase):
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def response_content(self, url, request):
|
|
|
f759a6b |
return {'status_code': 200, 'content': 'Oh hai'}
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_all_requests_response(self):
|
|
|
f759a6b |
with HTTMock(self.response_content):
|
|
|
f759a6b |
r = requests.get('https://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.status_code, 200)
|
|
|
f759a6b |
self.assertEqual(r.content, b'Oh hai')
|
|
|
f759a6b |
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def string_response_content(self, url, request):
|
|
|
f759a6b |
return 'Hello'
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_all_str_response(self):
|
|
|
f759a6b |
with HTTMock(self.string_response_content):
|
|
|
f759a6b |
r = requests.get('https://example.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class UrlMatchMethodDecoratorTest(unittest.TestCase):
|
|
|
f759a6b |
@urlmatch(netloc=r'(.*\.)?google\.com$', path=r'^/$')
|
|
|
f759a6b |
def google_mock(self, url, request):
|
|
|
f759a6b |
return 'Hello from Google'
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(scheme='http', netloc=r'(.*\.)?facebook\.com$')
|
|
|
f759a6b |
def facebook_mock(self, url, request):
|
|
|
f759a6b |
return 'Hello from Facebook'
|
|
|
f759a6b |
|
|
|
f759a6b |
@urlmatch(query=r'.*page=test')
|
|
|
f759a6b |
def query_page_mock(self, url, request):
|
|
|
f759a6b |
return 'Hello from test page'
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_netloc_fallback(self):
|
|
|
f759a6b |
with HTTMock(self.google_mock, facebook_mock):
|
|
|
f759a6b |
r = requests.get('http://google.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from Google')
|
|
|
f759a6b |
with HTTMock(self.google_mock, facebook_mock):
|
|
|
f759a6b |
r = requests.get('http://facebook.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from Facebook')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_query(self):
|
|
|
f759a6b |
with HTTMock(self.query_page_mock, self.google_mock):
|
|
|
f759a6b |
r = requests.get('http://google.com/?page=test')
|
|
|
f759a6b |
r2 = requests.get('http://google.com/')
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from test page')
|
|
|
f759a6b |
self.assertEqual(r2.content, b'Hello from Google')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class ResponseTest(unittest.TestCase):
|
|
|
f759a6b |
|
|
|
f759a6b |
content = {'name': 'foo', 'ipv4addr': '127.0.0.1'}
|
|
|
f759a6b |
content_list = list(content.keys())
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_response_auto_json(self):
|
|
|
f759a6b |
r = response(0, self.content)
|
|
|
f759a6b |
self.assertTrue(isinstance(r.content, binary_type))
|
|
|
f759a6b |
self.assertTrue(isinstance(r.text, text_type))
|
|
|
f759a6b |
self.assertEqual(r.json(), self.content)
|
|
|
f759a6b |
r = response(0, self.content_list)
|
|
|
f759a6b |
self.assertEqual(r.json(), self.content_list)
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_response_status_code(self):
|
|
|
f759a6b |
r = response(200)
|
|
|
f759a6b |
self.assertEqual(r.status_code, 200)
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_response_headers(self):
|
|
|
f759a6b |
r = response(200, None, {'Content-Type': 'application/json'})
|
|
|
f759a6b |
self.assertEqual(r.headers['content-type'], 'application/json')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_response_cookies(self):
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def response_content(url, request):
|
|
|
f759a6b |
return response(200, 'Foo', {'Set-Cookie': 'foo=bar;'},
|
|
|
f759a6b |
request=request)
|
|
|
f759a6b |
with HTTMock(response_content):
|
|
|
f759a6b |
r = requests.get('https://example.com/')
|
|
|
f759a6b |
self.assertEqual(len(r.cookies), 1)
|
|
|
f759a6b |
self.assertTrue('foo' in r.cookies)
|
|
|
f759a6b |
self.assertEqual(r.cookies['foo'], 'bar')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_response_session_cookies(self):
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def response_content(url, request):
|
|
|
f759a6b |
return response(200, 'Foo', {'Set-Cookie': 'foo=bar;'},
|
|
|
f759a6b |
request=request)
|
|
|
f759a6b |
session = requests.Session()
|
|
|
f759a6b |
with HTTMock(response_content):
|
|
|
f759a6b |
r = session.get('https://foo_bar')
|
|
|
f759a6b |
self.assertEqual(len(r.cookies), 1)
|
|
|
f759a6b |
self.assertTrue('foo' in r.cookies)
|
|
|
f759a6b |
self.assertEqual(r.cookies['foo'], 'bar')
|
|
|
f759a6b |
self.assertEqual(len(session.cookies), 1)
|
|
|
f759a6b |
self.assertTrue('foo' in session.cookies)
|
|
|
f759a6b |
self.assertEqual(session.cookies['foo'], 'bar')
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_python_version_encoding_differences(self):
|
|
|
f759a6b |
# Previous behavior would result in this test failing in Python3 due
|
|
|
f759a6b |
# to how requests checks for utf-8 JSON content in requests.utils with:
|
|
|
f759a6b |
#
|
|
|
f759a6b |
# TypeError: Can't convert 'bytes' object to str implicitly
|
|
|
f759a6b |
@all_requests
|
|
|
f759a6b |
def get_mock(url, request):
|
|
|
f759a6b |
return {'content': self.content,
|
|
|
f759a6b |
'headers': {'content-type': 'application/json'},
|
|
|
f759a6b |
'status_code': 200,
|
|
|
f759a6b |
'elapsed': 5}
|
|
|
f759a6b |
|
|
|
f759a6b |
with HTTMock(get_mock):
|
|
|
f759a6b |
response = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(self.content, response.json())
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_mock_redirect(self):
|
|
|
f759a6b |
@urlmatch(netloc='example.com')
|
|
|
f759a6b |
def get_mock(url, request):
|
|
|
f759a6b |
return {'status_code': 302,
|
|
|
f759a6b |
'headers': {'Location': 'http://google.com/'}}
|
|
|
f759a6b |
|
|
|
f759a6b |
with HTTMock(get_mock, google_mock):
|
|
|
f759a6b |
response = requests.get('http://example.com/')
|
|
|
f759a6b |
self.assertEqual(len(response.history), 1)
|
|
|
f759a6b |
self.assertEqual(response.content, b'Hello from Google')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class StreamTest(unittest.TestCase):
|
|
|
f759a6b |
@with_httmock(any_mock)
|
|
|
f759a6b |
def test_stream_request(self):
|
|
|
f759a6b |
r = requests.get('http://domain.com/', stream=True)
|
|
|
f759a6b |
self.assertEqual(r.raw.read(), b'Hello from domain.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
@with_httmock(dict_any_mock)
|
|
|
f759a6b |
def test_stream_request_with_dict_mock(self):
|
|
|
f759a6b |
r = requests.get('http://domain.com/', stream=True)
|
|
|
f759a6b |
self.assertEqual(r.raw.read(), b'Hello from domain.com')
|
|
|
f759a6b |
|
|
|
f759a6b |
@with_httmock(any_mock)
|
|
|
f759a6b |
def test_non_stream_request(self):
|
|
|
f759a6b |
r = requests.get('http://domain.com/')
|
|
|
f759a6b |
self.assertEqual(r.raw.read(), b'')
|
|
|
f759a6b |
|
|
|
f759a6b |
|
|
|
f759a6b |
class RememberCalledTest(unittest.TestCase):
|
|
|
f759a6b |
|
|
|
f759a6b |
@staticmethod
|
|
|
f759a6b |
def several_calls(count, method, *args, **kwargs):
|
|
|
f759a6b |
results = []
|
|
|
f759a6b |
for _ in range(count):
|
|
|
f759a6b |
results.append(method(*args, **kwargs))
|
|
|
f759a6b |
return results
|
|
|
f759a6b |
|
|
|
f759a6b |
def test_several_calls(self):
|
|
|
f759a6b |
with HTTMock(google_mock_count, facebook_mock_count):
|
|
|
f759a6b |
results = self.several_calls(
|
|
|
f759a6b |
3, requests.get, 'http://facebook.com/')
|
|
|
f759a6b |
|
|
|
f759a6b |
self.assertTrue(facebook_mock_count.call['called'])
|
|
|
f759a6b |
self.assertEqual(facebook_mock_count.call['count'], 3)
|
|
|
f759a6b |
|
|
|
f759a6b |
self.assertFalse(google_mock_count.call['called'])
|
|
|
f759a6b |
self.assertEqual(google_mock_count.call['count'], 0)
|
|
|
f759a6b |
|
|
|
f759a6b |
for r in results:
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from Facebook')
|
|
|
f759a6b |
|
|
|
f759a6b |
# Negative case: cleanup call data
|
|
|
f759a6b |
with HTTMock(facebook_mock_count):
|
|
|
f759a6b |
results = self.several_calls(
|
|
|
f759a6b |
1, requests.get, 'http://facebook.com/')
|
|
|
f759a6b |
|
|
|
f759a6b |
self.assertEquals(facebook_mock_count.call['count'], 1)
|
|
|
f759a6b |
|
|
|
f759a6b |
@with_httmock(google_mock_count, facebook_mock_count)
|
|
|
f759a6b |
def test_several_call_decorated(self):
|
|
|
f759a6b |
results = self.several_calls(3, requests.get, 'http://facebook.com/')
|
|
|
f759a6b |
|
|
|
f759a6b |
self.assertTrue(facebook_mock_count.call['called'])
|
|
|
f759a6b |
self.assertEqual(facebook_mock_count.call['count'], 3)
|
|
|
f759a6b |
|
|
|
f759a6b |
self.assertFalse(google_mock_count.call['called'])
|
|
|
f759a6b |
self.assertEqual(google_mock_count.call['count'], 0)
|
|
|
f759a6b |
|
|
|
f759a6b |
for r in results:
|
|
|
f759a6b |
self.assertEqual(r.content, b'Hello from Facebook')
|
|
|
f759a6b |
|
|
|
f759a6b |
self.several_calls(1, requests.get, 'http://facebook.com/')
|
|
|
f759a6b |
self.assertEquals(facebook_mock_count.call['count'], 4)
|
|
|
5d192ad |
|
|
|
5d192ad |
def test_store_several_requests(self):
|
|
|
5d192ad |
with HTTMock(google_mock_store_requests):
|
|
|
5d192ad |
payload = {"query": "foo"}
|
|
|
5d192ad |
requests.post('http://google.com', data=payload)
|
|
|
5d192ad |
|
|
|
5d192ad |
self.assertTrue(google_mock_store_requests.call['called'])
|
|
|
5d192ad |
self.assertEqual(google_mock_store_requests.call['count'], 1)
|
|
|
5d192ad |
request = google_mock_store_requests.call['requests'][0]
|
|
|
5d192ad |
self.assertEqual(request.body, 'query=foo')
|