From b65c43322d20fa30560fa139ff5e27581c5878b9 Mon Sep 17 00:00:00 2001 From: Jeremy Cline Date: Jan 12 2018 17:17:06 +0000 Subject: Update to latest upstream Signed-off-by: Jeremy Cline --- diff --git a/.gitignore b/.gitignore index 6657dd9..794d641 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ /fmn-1.3.1.tar.gz /fmn-2.0.0.tar.gz /fmn-2.0.1.tar.gz +/fmn-2.0.2.tar.gz diff --git a/0001-Refactor-the-tasks-formatting-code.patch b/0001-Refactor-the-tasks-formatting-code.patch new file mode 100644 index 0000000..134aeef --- /dev/null +++ b/0001-Refactor-the-tasks-formatting-code.patch @@ -0,0 +1,197 @@ +From 9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73 Mon Sep 17 00:00:00 2001 +Message-Id: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org> +From: Jeremy Cline +Date: Wed, 20 Dec 2017 14:24:59 -0500 +Subject: [PATCH 1/3] Refactor the tasks formatting code + +Signed-off-by: Jeremy Cline +--- + fmn/tasks.py | 77 ++++++++++++++++++++++++++++++++----------------- + fmn/tests/test_tasks.py | 65 +++++++++++++++++++++++++++++++++++++++++ + 2 files changed, 115 insertions(+), 27 deletions(-) + +diff --git a/fmn/tasks.py b/fmn/tasks.py +index 4cadb6e..034ac6c 100644 +--- a/fmn/tasks.py ++++ b/fmn/tasks.py +@@ -207,22 +207,7 @@ class _FindRecipients(task.Task): + models.QueuedMessage.enqueue(session, user, context, message) + continue + +- formatted_message = None +- if context == 'email': +- formatted_message = formatters.email(message['body'], recipient) +- elif context == 'irc': +- formatted_message = formatters.irc(message['body'], recipient) +- elif context == 'sse': +- try: +- formatted_message = formatters.sse(message['body'], recipient) +- except Exception: +- _log.exception('An exception occurred formatting the message ' +- 'for delivery: falling back to sending the raw fedmsg') +- formatted_message = message +- +- if formatted_message is None: +- raise exceptions.FmnError( +- 'The message was not formatted in any way, aborting!') ++ formatted_message = _format(context, message, recipient) + + _log.info('Queuing message for delivery to %s on the %s backend', user, context) + backend_message = { +@@ -237,6 +222,49 @@ class _FindRecipients(task.Task): + session.commit() + + ++def _format(context, message, recipient): ++ """ ++ Format the message(s) using the context and recipient to determine settings. ++ ++ Args: ++ context (str): The name of the context; this is used to determine what formatter ++ function to use. ++ message (dict or list): A fedmsg or list of fedmsgs to format. ++ recipient (dict): A recipient dictionary passed on to the formatter function. ++ ++ Raises: ++ FmnError: If the message could not be formatted. ++ """ ++ formatted_message = None ++ ++ # If it's a dictionary, it's a single message that doesn't need batching ++ if isinstance(message, dict): ++ if context == 'email': ++ formatted_message = formatters.email(message['body'], recipient) ++ elif context == 'irc': ++ formatted_message = formatters.irc(message['body'], recipient) ++ elif context == 'sse': ++ try: ++ formatted_message = formatters.sse(message['body'], recipient) ++ except Exception: ++ _log.exception('An exception occurred formatting the message ' ++ 'for delivery: falling back to sending the raw fedmsg') ++ formatted_message = message ++ elif isinstance(message, list): ++ if context == 'email': ++ formatted_message = formatters.email_batch( ++ [m['body'] for m in message], recipient) ++ elif context == 'irc': ++ formatted_message = formatters.irc_batch( ++ [m['body'] for m in message], recipient) ++ ++ if formatted_message is None: ++ raise exceptions.FmnError( ++ 'The message was not formatted in any way, aborting!') ++ ++ return formatted_message ++ ++ + @app.task(name='fmn.tasks.batch_messages', ignore_results=True) + def batch_messages(): + """ +@@ -273,17 +301,12 @@ def batch_messages(): + for value in pref.detail_values + ] + for recipient in recipients: +- formatted_message = None +- if pref.context.name == 'email': +- formatted_message = formatters.email_batch( +- [m['body'] for m in messages], recipient) +- elif pref.context.name == 'irc': +- formatted_message = formatters.irc_batch( +- [m['body'] for m in messages], recipient) +- if formatted_message is None: +- _log.error('A batch message for %r was not formatted, skipping!', +- recipient) +- continue ++ try: ++ formatted_message = _format(pref.context.name, messages, recipient) ++ except exceptions.FmnError: ++ _log.error('A batch message for %r was not formatted, skipping!', ++ recipient) ++ continue + + backend_message = { + "context": pref.context.name, +diff --git a/fmn/tests/test_tasks.py b/fmn/tests/test_tasks.py +index 0939a8e..80356b4 100644 +--- a/fmn/tests/test_tasks.py ++++ b/fmn/tests/test_tasks.py +@@ -25,6 +25,7 @@ from kombu import Queue + import mock + + from fmn import tasks, lib as fmn_lib, constants ++from fmn.exceptions import FmnError + from fmn.lib import models + from fmn.tests import Base + +@@ -333,3 +334,67 @@ email notifications@fedoraproject.org if you have any concerns/issues/abuse.""" + confirmation = models.Confirmation.query.all() + self.assertEqual(1, len(confirmation)) + self.assertEqual('valid', confirmation[0].status) ++ ++ ++@mock.patch('fmn.tasks.formatters') ++class FormatTests(Base): ++ ++ def setUp(self): ++ super(FormatTests, self).setUp() ++ self.message = { ++ 'body': { ++ "msg": { ++ "changed": "rules", ++ "context": "email", ++ "openid": "jcline.id.fedoraproject.org" ++ }, ++ "msg_id": "2017-6aa71d5b-fbe4-49e7-afdd-afcf0d22802b", ++ "timestamp": 1507310730, ++ "topic": "org.fedoraproject.dev.fmn.filter.update", ++ "username": "vagrant", ++ } ++ } ++ self.recipient = { ++ "email address": "jeremy@jcline.org", ++ "filter_id": 11, ++ "filter_name": "test", ++ "filter_oneshot": False, ++ "markup_messages": False, ++ "shorten_links": False, ++ "triggered_by_links": False, ++ "user": "jcline.id.fedoraproject.org", ++ "verbose": True, ++ } ++ def test_failure(self, mock_formatters): ++ """Assert an exception is raised if there's no formatted message.""" ++ mock_formatters.email.return_value = None ++ self.assertRaises(FmnError, tasks._format, 'email', self.message, self.recipient) ++ ++ mock_formatters.email.assert_called_once_with(self.message['body'], self.recipient) ++ ++ ++ def test_single_email(self, mock_formatters): ++ """Assert single messages for email context use the correct formatter.""" ++ tasks._format('email', self.message, self.recipient) ++ ++ mock_formatters.email.assert_called_once_with(self.message['body'], self.recipient) ++ ++ def test_multi_email(self, mock_formatters): ++ """Assert multiple messages for email context use the correct formatter.""" ++ tasks._format('email', [self.message, self.message], self.recipient) ++ ++ mock_formatters.email_batch.assert_called_once_with( ++ [self.message['body'], self.message['body']], self.recipient) ++ ++ def test_single_irc(self, mock_formatters): ++ """Assert single messages for irc context use the correct formatter.""" ++ tasks._format('irc', self.message, self.recipient) ++ ++ mock_formatters.irc.assert_called_once_with(self.message['body'], self.recipient) ++ ++ def test_multi_irc(self, mock_formatters): ++ """Assert multiple messages for irc context use the correct formatter.""" ++ tasks._format('irc', [self.message, self.message], self.recipient) ++ ++ mock_formatters.irc_batch.assert_called_once_with( ++ [self.message['body'], self.message['body']], self.recipient) +-- +2.15.1 + diff --git a/0002-Refactor-the-tasks-to-add-error-handling.patch b/0002-Refactor-the-tasks-to-add-error-handling.patch new file mode 100644 index 0000000..7a4f54c --- /dev/null +++ b/0002-Refactor-the-tasks-to-add-error-handling.patch @@ -0,0 +1,490 @@ +From 7dd2fa4d15fe6c2ba28270a2a5617b99b9b9e9bd Mon Sep 17 00:00:00 2001 +Message-Id: <7dd2fa4d15fe6c2ba28270a2a5617b99b9b9e9bd.1515776905.git.jeremy@jcline.org> +In-Reply-To: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org> +References: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org> +From: Jeremy Cline +Date: Thu, 21 Dec 2017 09:41:28 -0500 +Subject: [PATCH 2/3] Refactor the tasks to add error handling + +Add SQLAlchemy error handling to the tasks. + +fixes #268 + +Signed-off-by: Jeremy Cline +--- + fmn/tasks.py | 262 +++++++++++++++++++++++++++++------------------- + fmn/tests/test_tasks.py | 120 +++++++++++++++++++++- + 2 files changed, 280 insertions(+), 102 deletions(-) + +diff --git a/fmn/tasks.py b/fmn/tasks.py +index 034ac6c..3b6ea35 100644 +--- a/fmn/tasks.py ++++ b/fmn/tasks.py +@@ -35,6 +35,7 @@ from celery import task + import fedmsg + import fedmsg.meta + import fedmsg_meta_fedora_infrastructure ++import sqlalchemy + + from . import config, lib as fmn_lib, formatters, exceptions + from . import fmn_fasshim +@@ -183,7 +184,6 @@ class _FindRecipients(task.Task): + dictionary. See :func:`fmn.lib.recipients` for the dictionary format. + message (dict): The raw fedmsg to humanize and deliver to the given recipients. + """ +- session = models.Session() + broker_url = config.app_conf['celery']['broker'] + + with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: +@@ -192,19 +192,11 @@ class _FindRecipients(task.Task): + _log.info('Dispatching messages for %d recipients for the %s backend', + len(recipients), context) + for recipient in recipients: ++ _maybe_mark_filter_fired(recipient) ++ + user = recipient['user'] + preference = self.user_preferences['{}_{}'.format(user, context)] +- +- if ('filter_oneshot' in recipient and recipient['filter_oneshot']): +- _log.info('Marking one-time filter as fired') +- idx = recipient['filter_id'] +- fltr = models.Filter.query.get(idx) +- fltr.fired(session) +- +- if preference.get('batch_delta') or preference.get('batch_count'): +- _log.info('User "%s" has batch delivery set; placing message in database', +- user) +- models.QueuedMessage.enqueue(session, user, context, message) ++ if _batch(preference, context, recipient, message): + continue + + formatted_message = _format(context, message, recipient) +@@ -219,7 +211,57 @@ class _FindRecipients(task.Task): + routing_key = BACKEND_QUEUE_PREFIX + context + producer.publish(backend_message, routing_key=routing_key, + declare=[Queue(routing_key, durable=True)]) +- session.commit() ++ ++ ++def _maybe_mark_filter_fired(recipient): ++ """ ++ If the filter was a one-shot filter, try to mark it as triggered. If that fails, ++ log the error and continue since there's not much else to be done. ++ ++ Args: ++ recipient (dict): The recipient dictionary. ++ """ ++ ++ if ('filter_oneshot' in recipient and recipient['filter_oneshot']): ++ _log.info('Marking one-time filter as fired') ++ session = models.Session() ++ idx = recipient['filter_id'] ++ try: ++ fltr = models.Filter.query.get(idx) ++ fltr.fired(session) ++ session.commit() ++ except (sqlalchemy.exc.SQLAlchemyError, AttributeError): ++ _log.exception('Unable to mark one-shot filter (id %s) as fired', idx) ++ session.rollback() ++ finally: ++ models.Session.remove() ++ ++ ++def _batch(preference, context, recipient, message): ++ """ ++ Batch the message if the user wishes it. ++ ++ Args: ++ preference (dict): The user's preferences in dictionary form. ++ context (str): The context to batch it for. ++ recipient (dict): The recipient dictionary. ++ message (dict): The fedmsg to batch. ++ """ ++ if preference.get('batch_delta') or preference.get('batch_count'): ++ _log.info('User "%s" has batch delivery set; placing message in database', ++ recipient['user']) ++ session = models.Session() ++ try: ++ models.QueuedMessage.enqueue(session, recipient['user'], context, message) ++ session.commit() ++ return True ++ except sqlalchemy.exc.SQLAlchemyError: ++ _log.exception('Unable to queue message for batch delivery') ++ session.rollback() ++ finally: ++ models.Session.remove() ++ ++ return False + + + def _format(context, message, recipient): +@@ -278,49 +320,56 @@ def batch_messages(): + This is intended to be run as a periodic task using Celery's beat service. + """ + session = models.Session() +- broker_url = config.app_conf['celery']['broker'] +- with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: +- producer = conn.Producer() +- for pref in models.Preference.list_batching(session): +- if not _batch_ready(pref): +- continue +- +- queued_messages = models.QueuedMessage.list_for( +- session, pref.user, pref.context) +- _log.info('Batching %d queued messages for %s', len(queued_messages), pref.user.openid) +- +- messages = [m.message for m in queued_messages] +- recipients = [ +- { +- pref.context.detail_name: value.value, +- 'user': pref.user.openid, +- 'markup_messages': pref.markup_messages, +- 'triggered_by_links': pref.triggered_by_links, +- 'shorten_links': pref.shorten_links, +- } +- for value in pref.detail_values +- ] +- for recipient in recipients: +- try: +- formatted_message = _format(pref.context.name, messages, recipient) +- except exceptions.FmnError: +- _log.error('A batch message for %r was not formatted, skipping!', +- recipient) ++ try: ++ broker_url = config.app_conf['celery']['broker'] ++ with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: ++ producer = conn.Producer() ++ for pref in models.Preference.list_batching(session): ++ if not _batch_ready(pref): + continue + +- backend_message = { +- "context": pref.context.name, +- "recipient": recipient, +- "fedmsg": messages, +- "formatted_message": formatted_message, +- } +- routing_key = BACKEND_QUEUE_PREFIX + pref.context.name +- producer.publish(backend_message, routing_key=routing_key, +- declare=[Queue(routing_key, durable=True)]) ++ queued_messages = models.QueuedMessage.list_for( ++ session, pref.user, pref.context) ++ _log.info('Batching %d queued messages for %s', ++ len(queued_messages), pref.user.openid) + +- for message in queued_messages: +- message.dequeue(session) +- session.commit() ++ messages = [m.message for m in queued_messages] ++ recipients = [ ++ { ++ pref.context.detail_name: value.value, ++ 'user': pref.user.openid, ++ 'markup_messages': pref.markup_messages, ++ 'triggered_by_links': pref.triggered_by_links, ++ 'shorten_links': pref.shorten_links, ++ } ++ for value in pref.detail_values ++ ] ++ for recipient in recipients: ++ try: ++ formatted_message = _format(pref.context.name, messages, recipient) ++ except exceptions.FmnError: ++ _log.error('A batch message for %r was not formatted, skipping!', ++ recipient) ++ continue ++ ++ backend_message = { ++ "context": pref.context.name, ++ "recipient": recipient, ++ "fedmsg": messages, ++ "formatted_message": formatted_message, ++ } ++ routing_key = BACKEND_QUEUE_PREFIX + pref.context.name ++ producer.publish(backend_message, routing_key=routing_key, ++ declare=[Queue(routing_key, durable=True)]) ++ ++ for message in queued_messages: ++ message.dequeue(session) ++ session.commit() ++ except sqlalchemy.exc.SQLAlchemyError: ++ _log.exception('Failed to dispatch queued messages for delivery') ++ session.rollback() ++ finally: ++ models.Session.remove() + + + def _batch_ready(preference): +@@ -334,23 +383,27 @@ def _batch_ready(preference): + bool: True if there's a batch ready. + """ + session = models.Session() +- count = models.QueuedMessage.count_for(session, preference.user, preference.context) +- if not count: +- return False +- +- # Batch based on count +- if preference.batch_count is not None and preference.batch_count <= count: +- _log.info("Sending digest for %r per msg count", preference.user.openid) +- return True +- +- # Batch based on time +- earliest = models.QueuedMessage.earliest_for( +- session, preference.user, preference.context) +- now = datetime.datetime.utcnow() +- delta = datetime.timedelta.total_seconds(now - earliest.created_on) +- if preference.batch_delta is not None and preference.batch_delta <= delta: +- _log.info("Sending digest for %r per time delta", preference.user.openid) +- return True ++ try: ++ count = models.QueuedMessage.count_for(session, preference.user, preference.context) ++ if not count: ++ return False ++ ++ # Batch based on count ++ if preference.batch_count is not None and preference.batch_count <= count: ++ _log.info("Sending digest for %r per msg count", preference.user.openid) ++ return True ++ ++ # Batch based on time ++ earliest = models.QueuedMessage.earliest_for( ++ session, preference.user, preference.context) ++ now = datetime.datetime.utcnow() ++ delta = datetime.timedelta.total_seconds(now - earliest.created_on) ++ if preference.batch_delta is not None and preference.batch_delta <= delta: ++ _log.info("Sending digest for %r per time delta", preference.user.openid) ++ return True ++ except sqlalchemy.exc.SQLAlchemyError: ++ _log.exception('Failed to determine if the batch is ready for %s', preference.user) ++ session.rollback() + + return False + +@@ -375,37 +428,44 @@ def confirmations(): + This is intended to be dispatched regularly via celery beat. + """ + session = models.Session() +- models.Confirmation.delete_expired(session) +- pending = models.Confirmation.query.filter_by(status='pending').all() +- broker_url = config.app_conf['celery']['broker'] +- with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: +- producer = conn.Producer() +- for confirmation in pending: +- message = None +- if confirmation.context.name == 'email': +- message = formatters.email_confirmation(confirmation) +- else: +- # The way the irc backend is currently written, it has to format the +- # confirmation itself. For now, just send an empty message, but in the +- # future it may be worth refactoring the irc backend to let us format here. +- message = '' +- recipient = { +- confirmation.context.detail_name: confirmation.detail_value, +- 'user': confirmation.user.openid, +- 'triggered_by_links': False, +- 'confirmation': True, +- } +- backend_message = { +- "context": confirmation.context.name, +- "recipient": recipient, +- "fedmsg": {}, +- "formatted_message": message, +- } +- _log.info('Dispatching confirmation message for %r', confirmation) +- confirmation.set_status(session, 'valid') +- routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name +- producer.publish(backend_message, routing_key=routing_key, +- declare=[Queue(routing_key, durable=True)]) ++ try: ++ models.Confirmation.delete_expired(session) ++ pending = models.Confirmation.query.filter_by(status='pending').all() ++ broker_url = config.app_conf['celery']['broker'] ++ with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: ++ producer = conn.Producer() ++ for confirmation in pending: ++ message = None ++ if confirmation.context.name == 'email': ++ message = formatters.email_confirmation(confirmation) ++ else: ++ # The way the irc backend is currently written, it has to format the ++ # confirmation itself. For now, just send an empty message, but in the ++ # future it may be worth refactoring the irc backend to let us format here. ++ message = '' ++ recipient = { ++ confirmation.context.detail_name: confirmation.detail_value, ++ 'user': confirmation.user.openid, ++ 'triggered_by_links': False, ++ 'confirmation': True, ++ } ++ backend_message = { ++ "context": confirmation.context.name, ++ "recipient": recipient, ++ "fedmsg": {}, ++ "formatted_message": message, ++ } ++ _log.info('Dispatching confirmation message for %r', confirmation) ++ confirmation.set_status(session, 'valid') ++ routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name ++ producer.publish(backend_message, routing_key=routing_key, ++ declare=[Queue(routing_key, durable=True)]) ++ session.commit() ++ except sqlalchemy.exc.SQLAlchemyError: ++ _log.exception('Unable to handle confirmations') ++ session.rollback() ++ finally: ++ models.Session.remove() + + + #: A Celery task that accepts a message as input and determines the recipients. +diff --git a/fmn/tests/test_tasks.py b/fmn/tests/test_tasks.py +index 80356b4..5fadde0 100644 +--- a/fmn/tests/test_tasks.py ++++ b/fmn/tests/test_tasks.py +@@ -23,6 +23,7 @@ import datetime + from dogpile import cache + from kombu import Queue + import mock ++import sqlalchemy + + from fmn import tasks, lib as fmn_lib, constants + from fmn.exceptions import FmnError +@@ -365,6 +366,7 @@ class FormatTests(Base): + "user": "jcline.id.fedoraproject.org", + "verbose": True, + } ++ + def test_failure(self, mock_formatters): + """Assert an exception is raised if there's no formatted message.""" + mock_formatters.email.return_value = None +@@ -372,7 +374,6 @@ class FormatTests(Base): + + mock_formatters.email.assert_called_once_with(self.message['body'], self.recipient) + +- + def test_single_email(self, mock_formatters): + """Assert single messages for email context use the correct formatter.""" + tasks._format('email', self.message, self.recipient) +@@ -398,3 +399,120 @@ class FormatTests(Base): + + mock_formatters.irc_batch.assert_called_once_with( + [self.message['body'], self.message['body']], self.recipient) ++ ++ ++class MaybeMarkFilterFiredTests(Base): ++ """Tests for :func:`tasks._maybe_mark_filter_fired`.""" ++ ++ def test_no_filter_oneshot(self): ++ """Assert if the filter_oneshot key is missing, None is returned.""" ++ self.assertTrue(tasks._maybe_mark_filter_fired({}) is None) ++ ++ def test_filter_oneshot_false(self): ++ """Assert if the filter_oneshot key is missing, None is returned.""" ++ self.assertTrue(tasks._maybe_mark_filter_fired({'filter_oneshot': False}) is None) ++ ++ @mock.patch('fmn.tasks._log') ++ def test_invalid_filter_id(self, mock_log): ++ """Assert if the filter_id references an invalid key, nothing bad happens.""" ++ recipient = {'filter_oneshot': True, 'filter_id': 8675309} ++ ++ self.assertTrue(tasks._maybe_mark_filter_fired(recipient) is None) ++ mock_log.exception.assert_called_once_with( ++ 'Unable to mark one-shot filter (id %s) as fired', 8675309) ++ ++ def test_valid_oneshot_filter(self): ++ """Assert oneshot filters are properly deactivated.""" ++ session = models.Session() ++ filt = models.Filter(active=True, oneshot=True) ++ session.add(filt) ++ session.commit() ++ recipient = {'filter_oneshot': True, 'filter_id': filt.id} ++ ++ self.assertTrue(tasks._maybe_mark_filter_fired(recipient) is None) ++ self.assertEqual(1, models.Filter.query.count()) ++ self.assertFalse(models.Filter.query.one().active) ++ ++ ++class BatchTests(Base): ++ """Tests for :func:`tasks._batch`.""" ++ ++ def setUp(self): ++ super(BatchTests, self).setUp() ++ user = models.User( ++ openid='jcline.id.fedoraproject.org', openid_url='http://jcline.id.fedoraproject.org') ++ self.sess.add(user) ++ context = models.Context( ++ name='sse', description='description', detail_name='SSE', icon='wat') ++ self.sess.add(context) ++ self.sess.commit() ++ fmn_lib.defaults.create_defaults_for(self.sess, user, detail_values={'sse': 'jcline'}) ++ self.sess.commit() ++ preference = models.Preference.query.filter_by( ++ context_name='sse', openid='jcline.id.fedoraproject.org').first() ++ preference.enabled = True ++ self.sess.add(preference) ++ self.sess.commit() ++ ++ self. message = { ++ "topic": "org.fedoraproject.prod.buildsys.build.state.change", ++ 'body': { ++ "username": "apache", ++ "i": 1, ++ "timestamp": 1505399391.0, ++ "msg_id": "2017-7c65d9ff-85c0-42bb-8288-9b6112cb3da2", ++ "topic": "org.fedoraproject.prod.buildsys.build.state.change", ++ "msg": { ++ "build_id": 970796, ++ "old": 0, ++ "name": "fedmsg", ++ "task_id": 21861152, ++ "attribute": "state", ++ "request": [ ++ ("git://pkgs.fedoraproject.org/rpms/fedmsg?#" ++ "870987e84539239a22170475bbf13ac4d2ef4382"), ++ "f26-candidate", ++ {}, ++ ], ++ "instance": "primary", ++ "version": "1.0.1", ++ "owner": "jcline", ++ "new": 1, ++ "release": "4.fc26" ++ } ++ } ++ } ++ ++ self.recipient = { ++ 'triggered_by_links': True, ++ 'markup_messages': False, ++ 'user': u'jcline.id.fedoraproject.org', ++ 'filter_name': u'Events referring to my username', ++ u'SSE': u'jcline', ++ 'filter_oneshot': False, ++ 'filter_id': 2, ++ 'shorten_links': False, ++ 'verbose': True ++ } ++ ++ def test_no_batch_delta_or_count(self): ++ """Assert False is returned if the message isn't batched.""" ++ self.assertFalse(tasks._batch({}, 'sse', self.recipient, self.message)) ++ ++ def test_batch_count(self): ++ """Assert the batch_count keyword triggers batching.""" ++ self.assertTrue(tasks._batch({'batch_count': True}, 'sse', self.recipient, self.message)) ++ self.assertEqual(1, models.QueuedMessage.query.count()) ++ ++ def test_batch_delta(self): ++ """Assert the batch_delta keyword triggers batching.""" ++ self.assertTrue(tasks._batch({'batch_delta': True}, 'sse', self.recipient, self.message)) ++ self.assertEqual(1, models.QueuedMessage.query.count()) ++ ++ @mock.patch('fmn.tasks.models.QueuedMessage.enqueue') ++ def test_batch_failure(self, mock_enqueue): ++ """Assert a database failure results in the message not being queued.""" ++ mock_enqueue.side_effect = sqlalchemy.exc.SQLAlchemyError('boop') ++ ++ self.assertFalse(tasks._batch({'batch_delta': True}, 'sse', self.recipient, self.message)) ++ self.assertEqual(0, models.QueuedMessage.query.count()) +-- +2.15.1 + diff --git a/0003-Handle-sqlalchemy-errors-in-the-irc-backend.patch b/0003-Handle-sqlalchemy-errors-in-the-irc-backend.patch new file mode 100644 index 0000000..969261a --- /dev/null +++ b/0003-Handle-sqlalchemy-errors-in-the-irc-backend.patch @@ -0,0 +1,285 @@ +From 36663ba36ff06c852107ea0d24ec56a5549f6802 Mon Sep 17 00:00:00 2001 +Message-Id: <36663ba36ff06c852107ea0d24ec56a5549f6802.1515776905.git.jeremy@jcline.org> +In-Reply-To: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org> +References: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org> +From: Jeremy Cline +Date: Thu, 21 Dec 2017 11:02:41 -0500 +Subject: [PATCH 3/3] Handle sqlalchemy errors in the irc backend + +fixes #271 + +Signed-off-by: Jeremy Cline +--- + fmn/delivery/backends/irc.py | 210 +++++++++++++++++++++++++------------------ + 1 file changed, 122 insertions(+), 88 deletions(-) + +diff --git a/fmn/delivery/backends/irc.py b/fmn/delivery/backends/irc.py +index 6564a52..5adf0db 100644 +--- a/fmn/delivery/backends/irc.py ++++ b/fmn/delivery/backends/irc.py +@@ -20,6 +20,7 @@ from bleach import clean + from twisted.internet import ssl, reactor + import twisted.internet.protocol + import twisted.words.protocols.irc ++import sqlalchemy + + from fmn import formatters + from fmn.exceptions import FmnError +@@ -120,26 +121,35 @@ class IRCBackend(BaseBackend): + + def cmd_start(self, nick, message): + log.info("CMD start: %r sent us %r" % (nick, message)) +- sess = fmn.lib.models.init(self.config.get('fmn.sqlalchemy.uri')) +- if self.disabled_for(sess, nick): +- self.enable(sess, nick) +- self.send(nick, "OK") +- else: +- self.send(nick, "Messages not currently stopped. Nothing to do.") +- sess.commit() +- sess.close() ++ session = fmn.lib.models.Session() ++ try: ++ if self.disabled_for(session, nick): ++ self.enable(session, nick) ++ self.send(nick, "OK") ++ else: ++ self.send(nick, "Messages not currently stopped. Nothing to do.") ++ session.commit() ++ except sqlalchemy.exc.SQLAlchemyError: ++ self.send(nick, 'There was a problem contacting the database, please try again later') ++ session.rollback() ++ finally: ++ fmn.lib.models.Session.remove() + + def cmd_stop(self, nick, message): + log.info("CMD stop: %r sent us %r" % (nick, message)) +- sess = fmn.lib.models.init(self.config.get('fmn.sqlalchemy.uri')) + +- if self.disabled_for(sess, nick): +- self.send(nick, "Messages already stopped. Nothing to do.") +- else: +- self.disable(sess, nick) +- self.send(nick, "OK") +- sess.commit() +- sess.close() ++ session = fmn.lib.models.Session() ++ try: ++ if self.disabled_for(session, nick): ++ self.send(nick, "Messages already stopped. Nothing to do.") ++ else: ++ self.disable(session, nick) ++ self.send(nick, "OK") ++ except sqlalchemy.exc.SQLAlchemyError: ++ self.send(nick, 'There was a problem contacting the database, please try again later') ++ session.rollback() ++ finally: ++ fmn.lib.models.Session.remove() + + def cmd_list(self, nick, message): + log.info("CMD list: %r sent us %r" % (nick, message)) +@@ -206,66 +216,71 @@ class IRCBackend(BaseBackend): + log.info("CMD list filters: %r sent us %r" % (nick, message)) + + valid_paths = fmn.lib.load_rules(root="fmn.rules") +- sess = fmn.lib.models.init(self.config.get('fmn.sqlalchemy.uri')) +- pref = self.get_preference(sess, nick) ++ sess = fmn.lib.models.Session.remove() ++ try: ++ pref = self.get_preference(sess, nick) + +- if pref is None: +- self.send(nick, +- 'The nick is not configured with Fedora Notifications') +- sess.close() +- return ++ if pref is None: ++ self.send(nick, ++ 'The nick is not configured with Fedora Notifications') ++ sess.close() ++ return + +- subcmd_string = message.split(None, 2)[-1].lower() ++ subcmd_string = message.split(None, 2)[-1].lower() + +- # Returns the list of filters associated with the rule if the sub +- # command ends with `filters` else it is checked for the validity of +- # the filter name. If the matching filter is found then the list of +- # rules for the filter is returned. If the matching filter is not found +- # then an error message is returned +- if subcmd_string.strip() == 'filters': +- filters = pref.filters +- +- self.send(nick, 'You have {num_filter} filter(s)'.format( +- num_filter=len(filters))) +- +- for filtr in filters: +- self.send(nick, ' - {filtr_name}'.format( +- filtr_name=filtr.name) +- ) +- else: +- subcmd_string = self.dequote(subcmd_string) ++ # Returns the list of filters associated with the rule if the sub ++ # command ends with `filters` else it is checked for the validity of ++ # the filter name. If the matching filter is found then the list of ++ # rules for the filter is returned. If the matching filter is not found ++ # then an error message is returned ++ if subcmd_string.strip() == 'filters': ++ filters = pref.filters + +- try: +- filtr = pref.get_filter_name(sess, subcmd_string) +- except ValueError: +- self.send(nick, 'Not a valid filter') +- sess.close() +- return ++ self.send(nick, 'You have {num_filter} filter(s)'.format( ++ num_filter=len(filters))) + +- rules = filtr.rules +- self.send(nick, +- '{num_rule} matching rules for this filter'.format( +- num_rule=len(rules))) +- +- for rule in rules: +- rule_title = rule.title(valid_paths) +- rule_doc = clean(rule.doc( +- valid_paths, +- no_links=True +- ).replace('\n', ' '), strip=True) +- self.send(nick, rule_template.format( +- rule_title=rule_title, +- rule_doc=rule_doc +- )) +- +- if rule.arguments: +- for key, value in rule.arguments.iteritems(): +- self.send(nick, ' {key} - {value}'.format( +- key=key, +- value=value +- )) +- self.send(nick, '-*'*10) +- sess.close() ++ for filtr in filters: ++ self.send(nick, ' - {filtr_name}'.format( ++ filtr_name=filtr.name) ++ ) ++ else: ++ subcmd_string = self.dequote(subcmd_string) ++ ++ try: ++ filtr = pref.get_filter_name(sess, subcmd_string) ++ except ValueError: ++ self.send(nick, 'Not a valid filter') ++ sess.close() ++ return ++ ++ rules = filtr.rules ++ self.send(nick, ++ '{num_rule} matching rules for this filter'.format( ++ num_rule=len(rules))) ++ ++ for rule in rules: ++ rule_title = rule.title(valid_paths) ++ rule_doc = clean(rule.doc( ++ valid_paths, ++ no_links=True ++ ).replace('\n', ' '), strip=True) ++ self.send(nick, rule_template.format( ++ rule_title=rule_title, ++ rule_doc=rule_doc ++ )) ++ ++ if rule.arguments: ++ for key, value in rule.arguments.iteritems(): ++ self.send(nick, ' {key} - {value}'.format( ++ key=key, ++ value=value ++ )) ++ self.send(nick, '-*'*10) ++ except sqlalchemy.exc.SQLALchemyError: ++ self.send(nick, 'Unable to contact database, please try again later') ++ sess.rollback() ++ finally: ++ fmn.lib.models.Session.remove() + + def cmd_help(self, nick, message): + log.info("CMD help: %r sent us %r" % (nick, message)) +@@ -307,8 +322,6 @@ class IRCBackend(BaseBackend): + self._handle_confirmation(recipient['irc nick']) + return + +- session = fmn.lib.models.Session() +- + if not self.clients: + # This is usually the case if we are suffering a netsplit. + # Raising an exception will cause the message to be requeued and +@@ -323,9 +336,16 @@ class IRCBackend(BaseBackend): + + nickname = recipient['irc nick'] + +- if self.disabled_for(session, detail_value=nickname): +- log.debug("Messages stopped for %r, not sending." % nickname) +- return ++ session = fmn.lib.models.Session() ++ try: ++ if self.disabled_for(session, detail_value=nickname): ++ log.debug("Messages stopped for %r, not sending." % nickname) ++ return ++ except sqlalchemy.exc.SQLAlchemyError: ++ log.exception('Unable to determine if delivery is disabled') ++ session.rollback() ++ finally: ++ fmn.lib.models.Session.remove() + + for client in self.clients: + getattr(client, recipient.get('method', 'msg'))( +@@ -354,20 +374,34 @@ class IRCBackend(BaseBackend): + log.warning("IRCBackend has no clients to work with.") + return + +- confirmations = fmn.lib.models.Confirmation.by_detail( +- session, context="irc", value=nick) +- +- for confirmation in confirmations: +- lines = formatters.irc_confirmation(confirmation).split('\n') +- for line in lines: +- for client in self.clients: +- client.msg(nick.encode('utf-8'), line.encode('utf-8')) ++ session = fmn.lib.models.Session() ++ try: ++ confirmations = fmn.lib.models.Confirmation.by_detail( ++ session, context="irc", value=nick) ++ ++ for confirmation in confirmations: ++ lines = formatters.irc_confirmation(confirmation).split('\n') ++ for line in lines: ++ for client in self.clients: ++ client.msg(nick.encode('utf-8'), line.encode('utf-8')) ++ except sqlalchemy.exc.SQLAlchemyError: ++ session.rollback() ++ raise ++ finally: ++ fmn.lib.models.Session.remove() + + def handle_confirmation_invalid_nick(self, session, nick): +- confirmations = fmn.lib.models.Confirmation.by_detail( +- session, context="irc", value=nick) +- for confirmation in confirmations: +- confirmation.set_status(session, 'invalid') ++ session = fmn.lib.models.Session() ++ try: ++ confirmations = fmn.lib.models.Confirmation.by_detail( ++ session, context="irc", value=nick) ++ for confirmation in confirmations: ++ confirmation.set_status(session, 'invalid') ++ except sqlalchemy.exc.SQLAlchemyError: ++ session.rollback() ++ raise ++ finally: ++ fmn.lib.models.Session.remove() + + def cleanup_clients(self, factory): + self.clients = [c for c in self.clients if c.factory != factory] +-- +2.15.1 + diff --git a/python-fmn.spec b/python-fmn.spec index 0b1b259..80703da 100644 --- a/python-fmn.spec +++ b/python-fmn.spec @@ -5,13 +5,16 @@ %global with_docs 0 Name: python-%{srcname} -Version: 2.0.1 +Version: 2.0.2 Release: 1%{?dist} Summary: A system for generic fedmsg-driven notifications for end users License: LGPLv2+ URL: https://github.com/fedora-infra/%{srcname} Source0: %{url}/archive/%{version}/%{srcname}-%{version}.tar.gz +Patch0: 0001-Refactor-the-tasks-formatting-code.patch +Patch1: 0002-Refactor-the-tasks-to-add-error-handling.patch +Patch2: 0003-Handle-sqlalchemy-errors-in-the-irc-backend.patch BuildArch: noarch %description @@ -187,6 +190,9 @@ PYTHONPATH=$(pwd) py.test %changelog +* Fri Jan 12 2018 Jeremy Cline - 2.0.2-1 +- Update to latest upstream + * Thu Nov 02 2017 Jeremy Cline - 2.0.1-1 - Update to latest upstream diff --git a/sources b/sources index 29e3190..a240f61 100644 --- a/sources +++ b/sources @@ -1 +1 @@ -SHA512 (fmn-2.0.1.tar.gz) = 5a5688e8dc1665659120d6fc3e055b6119d8fa8c2481f612470e26af6ef15e6d4f1aa0b157a29804ce375a2dea09334ec3ae403028d6690fa4cf6d627df1c859 +SHA512 (fmn-2.0.2.tar.gz) = 180ab236c08616c03803d813da3349c7103e1440cd7c65e3b351ee84b4b5d2408d4065660e9d89361c8facf9ea496604142ef3d374366c79f96292f4e28d56e5