diff --git a/server/odcs/server/api_utils.py b/server/odcs/server/api_utils.py
index 295974f..6d93f36 100644
--- a/server/odcs/server/api_utils.py
+++ b/server/odcs/server/api_utils.py
@@ -30,47 +30,45 @@ from odcs.common.types import (
COMPOSE_RESULTS, COMPOSE_FLAGS, INVERSE_PUNGI_SOURCE_TYPE_NAMES)
-def _load_allowed_clients_attrs(key, attrs):
+def _set_default_client_allowed_attrs(ret_attrs, attrs):
"""
- Loads attributes from the
- conf.allowed_clients[key][user_name/group_name] dict. If the requested
- attribute is not found in the loaded dict, the conf.allowed_$attr_name
- is used as a default.
+ Helper method adding default allowed_client_attrs into `ret_attrs`.
- :param str key: "users" or "groups".
- :param list attrs: List of attribute names to load from the dict.
- :return: Dict with loaded attributes.
+ If some requested attributes from `attrs` are missing in `ret_attrs` list,
+ try to get them from "conf.allowed_$attr_name". If they are not there
+ too, use empty list to disallow everything.
"""
- clients = conf.allowed_clients.get(key, {})
- ret_attrs = []
- if key == "users":
- # Check if the user is defined in clients, if not, return None.
- if flask.g.user.username in clients:
- ret_attrs = dict(copy.deepcopy(clients[flask.g.user.username]))
- else:
- return None
- elif key == "groups":
- # Check if the group is defined in clients, if not, return None
- for group in flask.g.groups:
- if group in clients:
- ret_attrs = dict(copy.deepcopy(clients[group]))
- break
- else:
- return None
- else:
- raise ValueError(
- "Unknown key %r passed to _load_allowed_clients_attrs" % key)
-
- # If some requested attributes are missing in allowed_clients variable,
- # try to get them from "conf.allowed_$attr_name". If they are not there
- # too, use empty list to disallow everything.
for attr in attrs:
if attr not in ret_attrs:
ret_attrs[attr] = getattr(conf, "allowed_%s" % attr, [])
-
return ret_attrs
+def _load_allowed_clients_attr(attrs):
+ """
+ Loads attributes from the conf.allowed_clients dict based on the logged
+ in user and its groups. If the requested attribute is not found in
+ the loaded dict, the conf.allowed_$attr_name is used as a default.
+
+ :param list attrs: List of attribute names to load from the dict.
+ :return: Generator of Dicts with loaded attributes.
+ """
+ # Check if logged user exists in "users" definition of allowed_clients.
+ clients = conf.allowed_clients.get("users", {})
+ if flask.g.user.username in clients:
+ ret_attrs = dict(copy.deepcopy(clients[flask.g.user.username]))
+ ret_attrs = _set_default_client_allowed_attrs(ret_attrs, attrs)
+ yield ret_attrs
+ else:
+ # Check if group is defined in allowed_clients "groups".
+ clients = conf.allowed_clients.get("groups", {})
+ for group in flask.g.groups:
+ if group in clients:
+ ret_attrs = dict(copy.deepcopy(clients[group]))
+ ret_attrs = _set_default_client_allowed_attrs(ret_attrs, attrs)
+ yield ret_attrs
+
+
def _enum_int_to_str_list(enum_dict, val):
"""
Convenient method converting int value to list of strings
@@ -105,42 +103,47 @@ def raise_if_input_not_allowed(**kwargs):
if conf.auth_backend == 'noauth':
return
- # Prefer args for particular user - these overrides group ones.
- attrs = _load_allowed_clients_attrs("users", kwargs.keys())
- if not attrs:
- attrs = _load_allowed_clients_attrs("groups", kwargs.keys())
- if not attrs:
- raise Forbidden("User %s not allowed to operate with any "
- "compose" % flask.g.user.username)
-
- for name, values in kwargs.items():
- if name not in attrs:
- # This should not happen, but be defensive in this part of code...
- raise Forbidden(
- "User %s not allowed to operate with compose with %s=%r"
- % (flask.g.user.username, name, values))
-
- # Conver integers from db format to string list.
- if name == "source_types":
- values = INVERSE_PUNGI_SOURCE_TYPE_NAMES[values]
- elif name == "flags":
- values = _enum_int_to_str_list(COMPOSE_FLAGS, values)
- elif name == "results":
- values = _enum_int_to_str_list(COMPOSE_RESULTS, values)
-
- if type(values) == int:
- values = [values]
- elif isinstance(values, six.string_types):
- # `arches` and `sources` are white-space separated lists.
- values = values.split(" ")
-
- for value in values:
- allowed_values = attrs[name]
- if ((not allowed_values or value not in allowed_values) and
- allowed_values != [""]):
- raise Forbidden(
- "User %s not allowed to operate with compose with %s=%s"
- % (flask.g.user.username, name, value))
+ errors = set()
+ for attrs in _load_allowed_clients_attr(kwargs.keys()):
+ found_error = False
+ for name, values in kwargs.items():
+ if name not in attrs:
+ # This should not happen, but be defensive in this part of code...
+ errors.add(
+ "User %s not allowed to operate with compose with %s=%r."
+ % (flask.g.user.username, name, values))
+ continue
+
+ # Convert integers from db format to string list.
+ if name == "source_types":
+ values = INVERSE_PUNGI_SOURCE_TYPE_NAMES[values]
+ elif name == "flags":
+ values = _enum_int_to_str_list(COMPOSE_FLAGS, values)
+ elif name == "results":
+ values = _enum_int_to_str_list(COMPOSE_RESULTS, values)
+
+ if type(values) == int:
+ values = [values]
+ elif isinstance(values, six.string_types):
+ # `arches` and `sources` are white-space separated lists.
+ values = values.split(" ")
+
+ for value in values:
+ allowed_values = attrs[name]
+ if ((not allowed_values or value not in allowed_values) and
+ allowed_values != [""]):
+ errors.add(
+ "User %s not allowed to operate with compose with %s=%s."
+ % (flask.g.user.username, name, value))
+ found_error = True
+ break
+ if not found_error:
+ return
+ if errors:
+ raise Forbidden(" ".join(list(errors)))
+ else:
+ raise Forbidden(
+ "User %s not allowed to operate with any compose." % flask.g.user.username)
def validate_json_data(dict_or_list, level=0, last_dict_key=None):
diff --git a/server/tests/test_views.py b/server/tests/test_views.py
index bb59541..0441bd0 100644
--- a/server/tests/test_views.py
+++ b/server/tests/test_views.py
@@ -105,6 +105,9 @@ class ViewBaseTest(ModelsBaseTest):
'composer': {},
'dev2': {
'source_types': ['module']
+ },
+ 'dev3': {
+ 'source_types': ['raw_config']
}
},
'users': {
@@ -114,6 +117,9 @@ class ViewBaseTest(ModelsBaseTest):
'dev2': {
'source_types': ['module', 'raw_config'],
'compose_types': ["test", "nightly"]
+ },
+ 'dev3': {
+ 'source_types': ['tag']
}
}
}
@@ -692,7 +698,7 @@ class TestViews(ViewBaseTest):
self.assertEqual(
data['message'],
- 'User dev not allowed to operate with compose with source_types=repo')
+ 'User dev not allowed to operate with compose with source_types=repo.')
@patch.object(odcs.server.config.Config, 'raw_config_urls',
new={"pungi_cfg": "http://localhost/pungi.conf#%s"})
@@ -709,7 +715,7 @@ class TestViews(ViewBaseTest):
self.assertEqual(
data['message'],
- 'User dev2 not allowed to operate with compose with compose_types=production')
+ 'User dev2 not allowed to operate with compose with compose_types=production.')
def test_submit_build_unknown_source_type(self):
with self.test_request_context(user='dev'):
@@ -777,7 +783,7 @@ class TestViews(ViewBaseTest):
self.assertEqual(
data['message'],
- 'User dev2 not allowed to operate with compose with source_types=tag')
+ 'User dev2 not allowed to operate with compose with source_types=tag.')
def test_submit_build_per_group_source_type_allowed(self):
with self.test_request_context(user="unknown", groups=['dev2', "x"]):
@@ -804,7 +810,7 @@ class TestViews(ViewBaseTest):
self.assertEqual(
data['message'],
- 'User unknown not allowed to operate with compose with source_types=tag')
+ 'User unknown not allowed to operate with compose with source_types=tag.')
def test_query_compose(self):
resp = self.client.get('/api/1/composes/1')
@@ -988,6 +994,38 @@ class TestViews(ViewBaseTest):
c = db.session.query(Compose).filter(Compose.source == 'testmodule:rawhide').one()
self.assertEqual(c.state, COMPOSE_STATES["wait"])
+ def test_can_create_compose_with_user_in_multiple_groups(self):
+ with self.test_request_context(user='another_user', groups=['dev3', 'dev2']):
+ flask.g.oidc_scopes = [
+ '{0}{1}'.format(conf.oidc_base_namespace, 'new-compose')
+ ]
+
+ resp = self.client.post('/api/1/composes/', data=json.dumps(
+ {'source': {'type': 'module', 'source': 'testmodule:rawhide'}}))
+ db.session.expire_all()
+
+ self.assertEqual(resp.status, '200 OK')
+ self.assertEqual(resp.status_code, 200)
+ c = db.session.query(Compose).filter(Compose.source == 'testmodule:rawhide').one()
+ self.assertEqual(c.state, COMPOSE_STATES["wait"])
+
+ def test_cannot_create_compose_with_user_in_multiple_groups(self):
+ with self.test_request_context(user='another_user', groups=['dev3', 'dev2']):
+ flask.g.oidc_scopes = [
+ '{0}{1}'.format(conf.oidc_base_namespace, 'new-compose')
+ ]
+
+ resp = self.client.post('/api/1/composes/', data=json.dumps(
+ {'source': {'type': 'tag', 'source': 'testmodule:rawhide'}}))
+ data = json.loads(resp.get_data(as_text=True))
+ db.session.expire_all()
+
+ self.assertEqual(resp.status, '403 FORBIDDEN')
+ self.assertEqual(resp.status_code, 403)
+ self.assertEqual(
+ data['message'],
+ 'User another_user not allowed to operate with compose with source_types=tag.')
+
def test_can_delete_compose_with_user_in_configured_groups(self):
c3 = Compose.create(
db.session, "unknown", PungiSourceType.MODULE, "testmodule:testbranch",
@@ -1010,6 +1048,19 @@ class TestViews(ViewBaseTest):
six.assertRegex(self, data['message'],
r"The delete request for compose \(id=%s\) has been accepted and will be processed by backend later." % c3.id)
+ def test_can_create_compose_with_permission_overriden_by_username(self):
+ with self.test_request_context(user='dev3', groups=['dev2']):
+ flask.g.oidc_scopes = [
+ '{0}{1}'.format(conf.oidc_base_namespace, 'new-compose')
+ ]
+
+ resp = self.client.post('/api/1/composes/', data=json.dumps(
+ {'source': {'type': 'module', 'source': 'testmodule:rawhide'}}))
+ db.session.expire_all()
+
+ self.assertEqual(resp.status, '403 FORBIDDEN')
+ self.assertEqual(resp.status_code, 403)
+
@patch.object(odcs.server.config.Config, 'max_seconds_to_live', new_callable=PropertyMock)
@patch.object(odcs.server.config.Config, 'seconds_to_live', new_callable=PropertyMock)
def test_use_seconds_to_live_in_request(self, mock_seconds_to_live, mock_max_seconds_to_live):