Blob Blame History Raw
From 292c86ef6ffa1f2e6eec1c23dc2b21016487f8c4 Mon Sep 17 00:00:00 2001
From: Zhi Yan Liu <zhiyanl@cn.ibm.com>
Date: Mon, 15 Dec 2014 12:29:55 +0800
Subject: [PATCH] To prevent client use v2 patch api to handle file and swift
 location

The change will be used to restrict client to download and delete any
file in glance-api server. The same resone and logic as what we did in
v1:
https://github.com/openstack/glance/blob/master/glance/api/v1/images.py#L429

Closes-Bug: bug/1400966
DocImpact

Note: Even this change could fully resolve the problem for Glance, but
we still need to fix this issue from glance_store perspective
separatelly due to other projects can use the lib directly.

Conflicts:
	glance/api/v1/images.py
	glance/location.py
	glance/tests/functional/v2/test_images.py
	glance/tests/unit/test_store_location.py
	glance/tests/unit/v1/test_api.py

(cherry-picked from 4afdb017aa1ccef01482f117cb8d0736a6da38ed)
Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>

Change-Id: I66764956aa98ec8b66b5e890297d7515c2f2f00c
Resolves: rhbz #1174485
Upstream-Closes-Bug: #1400966
Upstream-Kilo: https://review.openstack.org/#/c/141706/
Upstream-Juno: https://review.openstack.org/#/c/142373/
Upstream-change-Id: I72dbead3cb2dcb87f52658ddb880e26880cc229b
Reviewed-on: https://code.engineering.redhat.com/gerrit/38835
Reviewed-by: Eoghan Glynn <eglynn@redhat.com>
Tested-by: Eoghan Glynn <eglynn@redhat.com>
---
 glance/api/v1/images.py                         |  31 ++---
 glance/common/store_utils.py                    |  22 ++++
 glance/location.py                              |  30 +++--
 glance/tests/functional/v1/test_copy_to_file.py |   4 +-
 glance/tests/functional/v2/test_images.py       | 158 ++++++++++--------------
 glance/tests/unit/test_store_image.py           |   3 +-
 glance/tests/unit/test_store_location.py        |  33 ++++-
 glance/tests/unit/utils.py                      |   9 +-
 glance/tests/unit/v1/test_api.py                |  62 +++++++++-
 9 files changed, 221 insertions(+), 131 deletions(-)

diff --git a/glance/api/v1/images.py b/glance/api/v1/images.py
index c85b301..746f8cd 100644
--- a/glance/api/v1/images.py
+++ b/glance/api/v1/images.py
@@ -23,7 +23,6 @@ import eventlet
 import glance_store as store
 import glance_store.location
 from oslo.config import cfg
-import six.moves.urllib.parse as urlparse
 from webob.exc import HTTPBadRequest
 from webob.exc import HTTPConflict
 from webob.exc import HTTPForbidden
@@ -40,6 +39,7 @@ from glance.api.v1 import filters
 from glance.api.v1 import upload_utils
 from glance.common import exception
 from glance.common import property_utils
+from glance.common import store_utils
 from glance.common import utils
 from glance.common import wsgi
 from glance.i18n import _LE
@@ -415,26 +415,19 @@ class Controller(controller.BaseController):
     @staticmethod
     def _validate_source(source, req):
         """
-        External sources (as specified via the location or copy-from headers)
-        are supported only over non-local store types, i.e. S3, Swift, HTTP.
-        Note the absence of 'file://' for security reasons, see LP bug #942118.
-        'swift+config://' is also absent for security reasons, see LP bug
-        #1334196.
-        If the above constraint is violated, we reject with 400 "Bad Request".
+        To validate if external sources (as specified via the location
+        or copy-from headers) are supported. Otherwise we reject
+        with 400 "Bad Request".
         """
         if source:
-            pieces = urlparse.urlparse(source)
-            schemes = [scheme for scheme in store.get_known_schemes()
-                       if scheme != 'file' and scheme != 'swift+config']
-            for scheme in schemes:
-                if pieces.scheme == scheme:
-                    return source
-            msg = ("External sourcing not supported for "
-                   "store '%s'" % pieces.scheme)
-            LOG.debug(msg)
-            raise HTTPBadRequest(explanation=msg,
-                                 request=req,
-                                 content_type="text/plain")
+            if store_utils.validate_external_location(source):
+                return source
+            else:
+                msg = _("External source are not supported: '%s'") % source
+                LOG.debug(msg)
+                raise HTTPBadRequest(explanation=msg,
+                                     request=req,
+                                     content_type="text/plain")
 
     @staticmethod
     def _copy_from(req):
diff --git a/glance/common/store_utils.py b/glance/common/store_utils.py
index 8f04d39..b7537ce 100644
--- a/glance/common/store_utils.py
+++ b/glance/common/store_utils.py
@@ -16,6 +16,7 @@ import sys
 
 import glance_store as store_api
 from oslo.config import cfg
+import six.moves.urllib.parse as urlparse
 
 from glance.common import utils
 import glance.db as db_api
@@ -119,3 +120,24 @@ def delete_image_location_from_backend(context, image_id, location):
         # such as uploading process failure then we can't use
         # location status mechanism to support image pending delete.
         safe_delete_from_backend(context, image_id, location)
+
+
+def validate_external_location(uri):
+    """
+    Validate if URI of external location are supported.
+
+    Only over non-local store types are OK, i.e. S3, Swift,
+    HTTP. Note the absence of 'file://' for security reasons,
+    see LP bug #942118, 1400966, 'swift+config://' is also
+    absent for security reasons, see LP bug #1334196.
+
+    :param uri: The URI of external image location.
+    :return: Whether given URI of external image location are OK.
+    """
+
+    # TODO(zhiyan): This function could be moved to glance_store.
+
+    pieces = urlparse.urlparse(uri)
+    valid_schemes = [scheme for scheme in store_api.get_known_schemes()
+                     if scheme != 'file' and scheme != 'swift+config']
+    return pieces.scheme in valid_schemes
diff --git a/glance/location.py b/glance/location.py
index fcdba0a..f83fa7a 100644
--- a/glance/location.py
+++ b/glance/location.py
@@ -66,18 +66,20 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
         return result
 
 
-def _check_location_uri(context, store_api, uri):
+def _check_location_uri(context, store_api, store_utils, uri):
     """Check if an image location is valid.
 
     :param context: Glance request context
     :param store_api: store API module
+    :param store_utils: store utils module
     :param uri: location's uri string
     """
+
     is_ok = True
     try:
-        size = store_api.get_size_from_backend(uri, context=context)
         # NOTE(zhiyan): Some stores return zero when it catch exception
-        is_ok = size > 0
+        is_ok = (store_utils.validate_external_location(uri) and
+                 store_api.get_size_from_backend(uri, context=context) > 0)
     except (store.UnknownScheme, store.NotFound):
         is_ok = False
     if not is_ok:
@@ -85,8 +87,8 @@ def _check_location_uri(context, store_api, uri):
         raise exception.BadStoreUri(message=reason)
 
 
-def _check_image_location(context, store_api, location):
-    _check_location_uri(context, store_api, location['url'])
+def _check_image_location(context, store_api, store_utils, location):
+    _check_location_uri(context, store_api, store_utils, location['url'])
     store_api.check_location_metadata(location['metadata'])
 
 
@@ -122,6 +124,7 @@ class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
     def __init__(self, factory, context, store_api, store_utils):
         self.context = context
         self.store_api = store_api
+        self.store_utils = store_utils
         proxy_kwargs = {'context': context, 'store_api': store_api,
                         'store_utils': store_utils}
         super(ImageFactoryProxy, self).__init__(factory,
@@ -131,7 +134,10 @@ class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
     def new_image(self, **kwargs):
         locations = kwargs.get('locations', [])
         for loc in locations:
-            _check_image_location(self.context, self.store_api, loc)
+            _check_image_location(self.context,
+                                  self.store_api,
+                                  self.store_utils,
+                                  loc)
             loc['status'] = 'active'
             if _count_duplicated_locations(locations, loc) > 1:
                 raise exception.DuplicateLocation(location=loc['url'])
@@ -169,7 +175,9 @@ class StoreLocations(collections.MutableSequence):
 
     def insert(self, i, location):
         _check_image_location(self.image_proxy.context,
-                              self.image_proxy.store_api, location)
+                              self.image_proxy.store_api,
+                              self.image_proxy.store_utils,
+                              location)
         location['status'] = 'active'
         if _count_duplicated_locations(self.value, location) > 0:
             raise exception.DuplicateLocation(location=location['url'])
@@ -214,7 +222,9 @@ class StoreLocations(collections.MutableSequence):
 
     def __setitem__(self, i, location):
         _check_image_location(self.image_proxy.context,
-                              self.image_proxy.store_api, location)
+                              self.image_proxy.store_api,
+                              self.image_proxy.store_utils,
+                              location)
         location['status'] = 'active'
         self.value.__setitem__(i, location)
         _set_image_size(self.image_proxy.context,
@@ -303,7 +313,9 @@ def _locations_proxy(target, attr):
                                           '%s') % ori_value)
             # NOTE(zhiyan): Check locations are all valid.
             for location in value:
-                _check_image_location(self.context, self.store_api,
+                _check_image_location(self.context,
+                                      self.store_api,
+                                      self.store_utils,
                                       location)
                 location['status'] = 'active'
                 if _count_duplicated_locations(value, location) > 1:
diff --git a/glance/tests/functional/v1/test_copy_to_file.py b/glance/tests/functional/v1/test_copy_to_file.py
index 15bb708..b64eac6 100644
--- a/glance/tests/functional/v1/test_copy_to_file.py
+++ b/glance/tests/functional/v1/test_copy_to_file.py
@@ -250,7 +250,7 @@ class TestCopyToFile(functional.FunctionalTest):
         response, content = http.request(path, 'POST', headers=headers)
         self.assertEqual(response.status, 400, content)
 
-        expected = 'External sourcing not supported for store \'file\''
+        expected = 'External source are not supported: \'%s\'' % copy_from
         msg = 'expected "%s" in "%s"' % (expected, content)
         self.assertTrue(expected in content, msg)
 
@@ -276,7 +276,7 @@ class TestCopyToFile(functional.FunctionalTest):
         response, content = http.request(path, 'POST', headers=headers)
         self.assertEqual(response.status, 400, content)
 
-        expected = 'External sourcing not supported for store \'swift+config\''
+        expected = 'External source are not supported: \'swift+config://xxx\''
         msg = 'expected "%s" in "%s"' % (expected, content)
         self.assertTrue(expected in content, msg)
 
diff --git a/glance/tests/functional/v2/test_images.py b/glance/tests/functional/v2/test_images.py
index 14fe3c7..4c32375 100644
--- a/glance/tests/functional/v2/test_images.py
+++ b/glance/tests/functional/v2/test_images.py
@@ -16,7 +16,6 @@
 import BaseHTTPServer
 import os
 import signal
-import tempfile
 import uuid
 
 import requests
@@ -47,7 +46,7 @@ def get_handler_class(fixture):
             self.end_headers()
             return
 
-        def log_message(*args, **kwargs):
+        def log_message(self, *args, **kwargs):
             # Override this method to prevent debug output from going
             # to stderr during testing
             return
@@ -75,6 +74,18 @@ class TestImages(functional.FunctionalTest):
         self.cleanup()
         self.api_server.deployment_flavor = 'noauth'
         self.api_server.data_api = 'glance.db.sqlalchemy.api'
+        for i in range(3):
+            ret = http_server("foo_image_id%d" % i, "foo_image%d" % i)
+            setattr(self, 'http_server%d_pid' % i, ret[0])
+            setattr(self, 'http_port%d' % i, ret[1])
+
+    def tearDown(self):
+        for i in range(3):
+            pid = getattr(self, 'http_server%d_pid' % i, None)
+            if pid:
+                os.kill(pid, signal.SIGKILL)
+
+        super(TestImages, self).tearDown()
 
     def _url(self, path):
         return 'http://127.0.0.1:%d%s' % (self.api_port, path)
@@ -329,21 +340,15 @@ class TestImages(functional.FunctionalTest):
         self.assertEqual(413, response.status_code, response.text)
 
         # Adding 3 image locations should fail since configured limit is 2
-        for i in range(3):
-            file_path = os.path.join(self.test_dir, 'fake_image_%i' % i)
-            with open(file_path, 'w') as fap:
-                fap.write('glance')
-
         path = self._url('/v2/images/%s' % image_id)
         media_type = 'application/openstack-images-v2.1-json-patch'
         headers = self._headers({'content-type': media_type})
         changes = []
         for i in range(3):
+            url = ('http://127.0.0.1:%s/foo_image' %
+                   getattr(self, 'http_port%d' % i))
             changes.append({'op': 'add', 'path': '/locations/-',
-                            'value': {'url': 'file://{0}'.format(
-                                os.path.join(self.test_dir,
-                                             'fake_image_%i' % i)),
-                                      'metadata': {}},
+                            'value': {'url': url, 'metadata': {}},
                             })
 
         data = jsonutils.dumps(changes)
@@ -2176,17 +2181,14 @@ class TestImages(functional.FunctionalTest):
         self.assertNotIn('size', image)
         self.assertNotIn('virtual_size', image)
 
-        file_path = os.path.join(self.test_dir, 'fake_image')
-        with open(file_path, 'w') as fap:
-            fap.write('glance')
-
         # Update locations for the queued image
         path = self._url('/v2/images/%s' % image_id)
         media_type = 'application/openstack-images-v2.1-json-patch'
         headers = self._headers({'content-type': media_type})
+        url = 'http://127.0.0.1:%s/foo_image' % self.http_port0
         data = jsonutils.dumps([{'op': 'replace', 'path': '/locations',
-                                 'value': [{'url': 'file://' + file_path,
-                                            'metadata': {}}]}])
+                                 'value': [{'url': url, 'metadata': {}}]
+                                 }])
         response = requests.patch(path, headers=headers, data=data)
         self.assertEqual(200, response.status_code, response.text)
 
@@ -2195,7 +2197,42 @@ class TestImages(functional.FunctionalTest):
         response = requests.get(path, headers=headers)
         self.assertEqual(200, response.status_code)
         image = jsonutils.loads(response.text)
-        self.assertEqual(image['size'], 6)
+        self.assertEqual(image['size'], 10)
+
+    def test_update_locations_with_restricted_sources(self):
+        self.start_servers(**self.__dict__.copy())
+        # Create an image
+        path = self._url('/v2/images')
+        headers = self._headers({'content-type': 'application/json'})
+        data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
+                                'container_format': 'aki'})
+        response = requests.post(path, headers=headers, data=data)
+        self.assertEqual(201, response.status_code)
+
+        # Returned image entity should have a generated id and status
+        image = jsonutils.loads(response.text)
+        image_id = image['id']
+        self.assertEqual('queued', image['status'])
+        self.assertNotIn('size', image)
+        self.assertNotIn('virtual_size', image)
+
+        # Update locations for the queued image
+        path = self._url('/v2/images/%s' % image_id)
+        media_type = 'application/openstack-images-v2.1-json-patch'
+        headers = self._headers({'content-type': media_type})
+        data = jsonutils.dumps([{'op': 'replace', 'path': '/locations',
+                                 'value': [{'url': 'file:///foo_image',
+                                            'metadata': {}}]
+                                 }])
+        response = requests.patch(path, headers=headers, data=data)
+        self.assertEqual(400, response.status_code, response.text)
+
+        data = jsonutils.dumps([{'op': 'replace', 'path': '/locations',
+                                 'value': [{'url': 'swift+config:///foo_image',
+                                            'metadata': {}}]
+                                 }])
+        response = requests.patch(path, headers=headers, data=data)
+        self.assertEqual(400, response.status_code, response.text)
 
 
 class TestImagesWithRegistry(TestImages):
@@ -2421,16 +2458,16 @@ class TestImageLocationSelectionStrategy(functional.FunctionalTest):
         super(TestImageLocationSelectionStrategy, self).setUp()
         self.cleanup()
         self.api_server.deployment_flavor = 'noauth'
-        self.foo_image_file = tempfile.NamedTemporaryFile()
-        self.foo_image_file.write("foo image file")
-        self.foo_image_file.flush()
-        self.addCleanup(self.foo_image_file.close)
-        ret = http_server("foo_image_id", "foo_image")
-        self.http_server_pid, self.http_port = ret
+        for i in range(3):
+            ret = http_server("foo_image_id%d" % i, "foo_image%d" % i)
+            setattr(self, 'http_server%d_pid' % i, ret[0])
+            setattr(self, 'http_port%d' % i, ret[1])
 
     def tearDown(self):
-        if self.http_server_pid is not None:
-            os.kill(self.http_server_pid, signal.SIGKILL)
+        for i in range(3):
+            pid = getattr(self, 'http_server%d_pid' % i, None)
+            if pid:
+                os.kill(pid, signal.SIGKILL)
 
         super(TestImageLocationSelectionStrategy, self).tearDown()
 
@@ -2483,69 +2520,10 @@ class TestImageLocationSelectionStrategy(functional.FunctionalTest):
         path = self._url('/v2/images/%s' % image_id)
         media_type = 'application/openstack-images-v2.1-json-patch'
         headers = self._headers({'content-type': media_type})
-        values = [{'url': 'file://%s' % self.foo_image_file.name,
-                   'metadata': {'idx': '1'}},
-                  {'url': 'http://127.0.0.1:%s/foo_image' % self.http_port,
-                   'metadata': {'idx': '0'}}]
-        doc = [{'op': 'replace',
-                'path': '/locations',
-                'value': values}]
-        data = jsonutils.dumps(doc)
-        response = requests.patch(path, headers=headers, data=data)
-        self.assertEqual(200, response.status_code)
-
-        # Image locations should be visible
-        path = self._url('/v2/images/%s' % image_id)
-        headers = self._headers({'Content-Type': 'application/json'})
-        response = requests.get(path, headers=headers)
-        self.assertEqual(200, response.status_code)
-        image = jsonutils.loads(response.text)
-        self.assertTrue('locations' in image)
-        self.assertEqual(image['locations'], values)
-        self.assertTrue('direct_url' in image)
-        self.assertEqual(image['direct_url'], values[0]['url'])
-
-        self.stop_servers()
-
-    def test_image_locatons_with_store_type_strategy(self):
-        self.api_server.show_image_direct_url = True
-        self.api_server.show_multiple_locations = True
-        self.image_location_quota = 10
-        self.api_server.location_strategy = 'store_type'
-        preference = "http, swift, filesystem"
-        self.api_server.store_type_location_strategy_preference = preference
-        self.start_servers(**self.__dict__.copy())
-
-        # Create an image
-        path = self._url('/v2/images')
-        headers = self._headers({'content-type': 'application/json'})
-        data = jsonutils.dumps({'name': 'image-1', 'type': 'kernel',
-                                'foo': 'bar', 'disk_format': 'aki',
-                                'container_format': 'aki'})
-        response = requests.post(path, headers=headers, data=data)
-        self.assertEqual(201, response.status_code)
-
-        # Get the image id
-        image = jsonutils.loads(response.text)
-        image_id = image['id']
-
-        # Image locations should not be visible before location is set
-        path = self._url('/v2/images/%s' % image_id)
-        headers = self._headers({'Content-Type': 'application/json'})
-        response = requests.get(path, headers=headers)
-        self.assertEqual(200, response.status_code)
-        image = jsonutils.loads(response.text)
-        self.assertTrue('locations' in image)
-        self.assertTrue(image["locations"] == [])
-
-        # Update image locations via PATCH
-        path = self._url('/v2/images/%s' % image_id)
-        media_type = 'application/openstack-images-v2.1-json-patch'
-        headers = self._headers({'content-type': media_type})
-        values = [{'url': 'file://%s' % self.foo_image_file.name,
-                   'metadata': {'idx': '1'}},
-                  {'url': 'http://127.0.0.1:%s/foo_image' % self.http_port,
-                   'metadata': {'idx': '0'}}]
+        values = [{'url': 'http://127.0.0.1:%s/foo_image' % self.http_port0,
+                   'metadata': {}},
+                  {'url': 'http://127.0.0.1:%s/foo_image' % self.http_port1,
+                   'metadata': {}}]
         doc = [{'op': 'replace',
                 'path': '/locations',
                 'value': values}]
@@ -2553,8 +2531,6 @@ class TestImageLocationSelectionStrategy(functional.FunctionalTest):
         response = requests.patch(path, headers=headers, data=data)
         self.assertEqual(200, response.status_code)
 
-        values.sort(key=lambda loc: int(loc['metadata']['idx']))
-
         # Image locations should be visible
         path = self._url('/v2/images/%s' % image_id)
         headers = self._headers({'Content-Type': 'application/json'})
diff --git a/glance/tests/unit/test_store_image.py b/glance/tests/unit/test_store_image.py
index 665f126..8b334ab 100644
--- a/glance/tests/unit/test_store_image.py
+++ b/glance/tests/unit/test_store_image.py
@@ -18,6 +18,7 @@ import glance_store
 
 from glance.common import exception
 import glance.location
+from glance.tests.unit import base as unit_test_base
 from glance.tests.unit import utils as unit_test_utils
 from glance.tests import utils
 
@@ -759,7 +760,7 @@ class TestStoreImageRepo(utils.BaseTestCase):
         self.assertEqual(acls['read'], [TENANT2])
 
 
-class TestImageFactory(utils.BaseTestCase):
+class TestImageFactory(unit_test_base.StoreClearingUnitTest):
 
     def setUp(self):
         super(TestImageFactory, self).setUp()
diff --git a/glance/tests/unit/test_store_location.py b/glance/tests/unit/test_store_location.py
index 884221b..c9ee44c 100644
--- a/glance/tests/unit/test_store_location.py
+++ b/glance/tests/unit/test_store_location.py
@@ -17,6 +17,8 @@ import mock
 
 import glance_store
 
+from glance.common import exception
+from glance.common import store_utils
 import glance.location
 from glance.tests.unit import base
 
@@ -32,11 +34,13 @@ CONF = {'default_store': 'file',
 
 class TestStoreLocation(base.StoreClearingUnitTest):
 
+    class FakeImageProxy():
+        size = None
+        context = None
+        store_api = mock.Mock()
+        store_utils = store_utils
+
     def test_add_location_for_image_without_size(self):
-        class FakeImageProxy():
-            size = None
-            context = None
-            store_api = mock.Mock()
 
         def fake_get_size_from_backend(uri, context=None):
             return 1
@@ -49,14 +53,31 @@ class TestStoreLocation(base.StoreClearingUnitTest):
             loc2 = {'url': 'file:///fake2.img.tar.gz', 'metadata': {}}
 
             # Test for insert location
-            image1 = FakeImageProxy()
+            image1 = TestStoreLocation.FakeImageProxy()
             locations = glance.location.StoreLocations(image1, [])
             locations.insert(0, loc2)
             self.assertEqual(image1.size, 1)
 
             # Test for set_attr of _locations_proxy
-            image2 = FakeImageProxy()
+            image2 = TestStoreLocation.FakeImageProxy()
             locations = glance.location.StoreLocations(image2, [loc1])
             locations[0] = loc2
             self.assertIn(loc2, locations)
             self.assertEqual(image2.size, 1)
+
+    def test_add_location_with_restricted_sources(self):
+
+        loc1 = {'url': 'file:///fake1.img.tar.gz', 'metadata': {}}
+        loc2 = {'url': 'swift+config:///xxx', 'metadata': {}}
+
+        # Test for insert location
+        image1 = TestStoreLocation.FakeImageProxy()
+        locations = glance.location.StoreLocations(image1, [])
+        self.assertRaises(exception.BadStoreUri, locations.insert, 0, loc1)
+        self.assertNotIn(loc1, locations)
+
+        # Test for set_attr of _locations_proxy
+        image2 = TestStoreLocation.FakeImageProxy()
+        locations = glance.location.StoreLocations(image2, [loc1])
+        self.assertRaises(exception.BadStoreUri, locations.insert, 0, loc2)
+        self.assertNotIn(loc2, locations)
diff --git a/glance/tests/unit/utils.py b/glance/tests/unit/utils.py
index df59160..f7c8d56 100644
--- a/glance/tests/unit/utils.py
+++ b/glance/tests/unit/utils.py
@@ -14,12 +14,13 @@
 #    under the License.
 
 import urllib
-import urlparse
 
 import glance_store as store
 from oslo.config import cfg
+import six.moves.urllib.parse as urlparse
 
 from glance.common import exception
+from glance.common import store_utils
 from glance.common import wsgi
 import glance.context
 import glance.db.simple.api as simple_db
@@ -135,6 +136,12 @@ class FakeStoreUtils(object):
         else:
             self.safe_delete_from_backend(context, image_id, location)
 
+    def validate_external_location(self, uri):
+        if uri and urlparse.urlparse(uri).scheme:
+            return store_utils.validate_external_location(uri)
+        else:
+            return True
+
 
 class FakeStoreAPI(object):
     def __init__(self, store_metadata=None):
diff --git a/glance/tests/unit/v1/test_api.py b/glance/tests/unit/v1/test_api.py
index bd2182e..4ec136d 100644
--- a/glance/tests/unit/v1/test_api.py
+++ b/glance/tests/unit/v1/test_api.py
@@ -419,7 +419,7 @@ class TestGlanceAPI(base.IsolatedUnitTest):
 
         res = req.get_response(self.api)
         self.assertEqual(res.status_int, 400)
-        self.assertIn('External sourcing not supported', res.body)
+        self.assertIn('External source are not supported', res.body)
 
     def test_create_with_location_bad_store_uri(self):
         fixture_headers = {
@@ -1006,7 +1006,7 @@ class TestGlanceAPI(base.IsolatedUnitTest):
             res = req.get_response(self.api)
             self.assertEqual(res.status_int, 409)
 
-    def test_add_location_with_invalid_location(self):
+    def test_add_location_with_invalid_location_on_conflict_image_size(self):
         """Tests creates an image from location and conflict image size"""
         fixture_headers = {'x-image-meta-store': 'file',
                            'x-image-meta-disk-format': 'vhd',
@@ -1023,6 +1023,36 @@ class TestGlanceAPI(base.IsolatedUnitTest):
         res = req.get_response(self.api)
         self.assertEqual(res.status_int, 400)
 
+    def test_add_location_with_invalid_location_on_restricted_sources(self):
+        """Tests creates an image from location and restricted sources"""
+        fixture_headers = {'x-image-meta-store': 'file',
+                           'x-image-meta-disk-format': 'vhd',
+                           'x-image-meta-location': 'file:///etc/passwd',
+                           'x-image-meta-container-format': 'ovf',
+                           'x-image-meta-name': 'fake image #F'}
+
+        req = webob.Request.blank("/images")
+        req.headers['Content-Type'] = 'application/octet-stream'
+        req.method = 'POST'
+        for k, v in fixture_headers.iteritems():
+            req.headers[k] = v
+        res = req.get_response(self.api)
+        self.assertEqual(400, res.status_int)
+
+        fixture_headers = {'x-image-meta-store': 'file',
+                           'x-image-meta-disk-format': 'vhd',
+                           'x-image-meta-location': 'swift+config://xxx',
+                           'x-image-meta-container-format': 'ovf',
+                           'x-image-meta-name': 'fake image #F'}
+
+        req = webob.Request.blank("/images")
+        req.headers['Content-Type'] = 'application/octet-stream'
+        req.method = 'POST'
+        for k, v in fixture_headers.iteritems():
+            req.headers[k] = v
+        res = req.get_response(self.api)
+        self.assertEqual(400, res.status_int)
+
     def test_add_copy_from_with_location(self):
         """Tests creates an image from copy-from and location"""
         fixture_headers = {'x-image-meta-store': 'file',
@@ -1039,6 +1069,34 @@ class TestGlanceAPI(base.IsolatedUnitTest):
         res = req.get_response(self.api)
         self.assertEqual(res.status_int, 400)
 
+    def test_add_copy_from_with_restricted_sources(self):
+        """Tests creates an image from copy-from with restricted sources"""
+        fixture_headers = {'x-image-meta-store': 'file',
+                           'x-image-meta-disk-format': 'vhd',
+                           'x-glance-api-copy-from': 'file:///etc/passwd',
+                           'x-image-meta-container-format': 'ovf',
+                           'x-image-meta-name': 'fake image #F'}
+
+        req = webob.Request.blank("/images")
+        req.method = 'POST'
+        for k, v in six.iteritems(fixture_headers):
+            req.headers[k] = v
+        res = req.get_response(self.api)
+        self.assertEqual(400, res.status_int)
+
+        fixture_headers = {'x-image-meta-store': 'file',
+                           'x-image-meta-disk-format': 'vhd',
+                           'x-glance-api-copy-from': 'swift+config://xxx',
+                           'x-image-meta-container-format': 'ovf',
+                           'x-image-meta-name': 'fake image #F'}
+
+        req = webob.Request.blank("/images")
+        req.method = 'POST'
+        for k, v in six.iteritems(fixture_headers):
+            req.headers[k] = v
+        res = req.get_response(self.api)
+        self.assertEqual(400, res.status_int)
+
     def test_add_copy_from_upload_image_unauthorized_with_body(self):
         rules = {"upload_image": '!', "modify_image": '@',
                  "add_image": '@'}