Blob Blame History Raw
From 7dd2fa4d15fe6c2ba28270a2a5617b99b9b9e9bd Mon Sep 17 00:00:00 2001
Message-Id: <7dd2fa4d15fe6c2ba28270a2a5617b99b9b9e9bd.1515776905.git.jeremy@jcline.org>
In-Reply-To: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org>
References: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org>
From: Jeremy Cline <jeremy@jcline.org>
Date: Thu, 21 Dec 2017 09:41:28 -0500
Subject: [PATCH 2/3] Refactor the tasks to add error handling

Add SQLAlchemy error handling to the tasks.

fixes #268

Signed-off-by: Jeremy Cline <jeremy@jcline.org>
---
 fmn/tasks.py            | 262 +++++++++++++++++++++++++++++-------------------
 fmn/tests/test_tasks.py | 120 +++++++++++++++++++++-
 2 files changed, 280 insertions(+), 102 deletions(-)

diff --git a/fmn/tasks.py b/fmn/tasks.py
index 034ac6c..3b6ea35 100644
--- a/fmn/tasks.py
+++ b/fmn/tasks.py
@@ -35,6 +35,7 @@ from celery import task
 import fedmsg
 import fedmsg.meta
 import fedmsg_meta_fedora_infrastructure
+import sqlalchemy
 
 from . import config, lib as fmn_lib, formatters, exceptions
 from . import fmn_fasshim
@@ -183,7 +184,6 @@ class _FindRecipients(task.Task):
                 dictionary. See :func:`fmn.lib.recipients` for the dictionary format.
             message (dict): The raw fedmsg to humanize and deliver to the given recipients.
         """
-        session = models.Session()
         broker_url = config.app_conf['celery']['broker']
 
         with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
@@ -192,19 +192,11 @@ class _FindRecipients(task.Task):
                 _log.info('Dispatching messages for %d recipients for the %s backend',
                           len(recipients), context)
                 for recipient in recipients:
+                    _maybe_mark_filter_fired(recipient)
+
                     user = recipient['user']
                     preference = self.user_preferences['{}_{}'.format(user, context)]
-
-                    if ('filter_oneshot' in recipient and recipient['filter_oneshot']):
-                        _log.info('Marking one-time filter as fired')
-                        idx = recipient['filter_id']
-                        fltr = models.Filter.query.get(idx)
-                        fltr.fired(session)
-
-                    if preference.get('batch_delta') or preference.get('batch_count'):
-                        _log.info('User "%s" has batch delivery set; placing message in database',
-                                  user)
-                        models.QueuedMessage.enqueue(session, user, context, message)
+                    if _batch(preference, context, recipient, message):
                         continue
 
                     formatted_message = _format(context, message, recipient)
@@ -219,7 +211,57 @@ class _FindRecipients(task.Task):
                     routing_key = BACKEND_QUEUE_PREFIX + context
                     producer.publish(backend_message, routing_key=routing_key,
                                      declare=[Queue(routing_key, durable=True)])
-                    session.commit()
+
+
+def _maybe_mark_filter_fired(recipient):
+    """
+    If the filter was a one-shot filter, try to mark it as triggered. If that fails,
+    log the error and continue since there's not much else to be done.
+
+    Args:
+        recipient (dict): The recipient dictionary.
+    """
+
+    if ('filter_oneshot' in recipient and recipient['filter_oneshot']):
+        _log.info('Marking one-time filter as fired')
+        session = models.Session()
+        idx = recipient['filter_id']
+        try:
+            fltr = models.Filter.query.get(idx)
+            fltr.fired(session)
+            session.commit()
+        except (sqlalchemy.exc.SQLAlchemyError, AttributeError):
+            _log.exception('Unable to mark one-shot filter (id %s) as fired', idx)
+            session.rollback()
+        finally:
+            models.Session.remove()
+
+
+def _batch(preference, context, recipient, message):
+    """
+    Batch the message if the user wishes it.
+
+    Args:
+        preference (dict): The user's preferences in dictionary form.
+        context (str): The context to batch it for.
+        recipient (dict): The recipient dictionary.
+        message (dict): The fedmsg to batch.
+    """
+    if preference.get('batch_delta') or preference.get('batch_count'):
+        _log.info('User "%s" has batch delivery set; placing message in database',
+                  recipient['user'])
+        session = models.Session()
+        try:
+            models.QueuedMessage.enqueue(session, recipient['user'], context, message)
+            session.commit()
+            return True
+        except sqlalchemy.exc.SQLAlchemyError:
+            _log.exception('Unable to queue message for batch delivery')
+            session.rollback()
+        finally:
+            models.Session.remove()
+
+    return False
 
 
 def _format(context, message, recipient):
@@ -278,49 +320,56 @@ def batch_messages():
     This is intended to be run as a periodic task using Celery's beat service.
     """
     session = models.Session()
-    broker_url = config.app_conf['celery']['broker']
-    with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
-        producer = conn.Producer()
-        for pref in models.Preference.list_batching(session):
-            if not _batch_ready(pref):
-                continue
-
-            queued_messages = models.QueuedMessage.list_for(
-                session, pref.user, pref.context)
-            _log.info('Batching %d queued messages for %s', len(queued_messages), pref.user.openid)
-
-            messages = [m.message for m in queued_messages]
-            recipients = [
-                {
-                    pref.context.detail_name: value.value,
-                    'user': pref.user.openid,
-                    'markup_messages': pref.markup_messages,
-                    'triggered_by_links': pref.triggered_by_links,
-                    'shorten_links': pref.shorten_links,
-                }
-                for value in pref.detail_values
-            ]
-            for recipient in recipients:
-                try:
-                    formatted_message = _format(pref.context.name, messages, recipient)
-                except exceptions.FmnError:
-                    _log.error('A batch message for %r was not formatted, skipping!',
-                               recipient)
+    try:
+        broker_url = config.app_conf['celery']['broker']
+        with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
+            producer = conn.Producer()
+            for pref in models.Preference.list_batching(session):
+                if not _batch_ready(pref):
                     continue
 
-                backend_message = {
-                    "context": pref.context.name,
-                    "recipient": recipient,
-                    "fedmsg": messages,
-                    "formatted_message": formatted_message,
-                }
-                routing_key = BACKEND_QUEUE_PREFIX + pref.context.name
-                producer.publish(backend_message, routing_key=routing_key,
-                                 declare=[Queue(routing_key, durable=True)])
+                queued_messages = models.QueuedMessage.list_for(
+                    session, pref.user, pref.context)
+                _log.info('Batching %d queued messages for %s',
+                          len(queued_messages), pref.user.openid)
 
-            for message in queued_messages:
-                message.dequeue(session)
-            session.commit()
+                messages = [m.message for m in queued_messages]
+                recipients = [
+                    {
+                        pref.context.detail_name: value.value,
+                        'user': pref.user.openid,
+                        'markup_messages': pref.markup_messages,
+                        'triggered_by_links': pref.triggered_by_links,
+                        'shorten_links': pref.shorten_links,
+                    }
+                    for value in pref.detail_values
+                ]
+                for recipient in recipients:
+                    try:
+                        formatted_message = _format(pref.context.name, messages, recipient)
+                    except exceptions.FmnError:
+                        _log.error('A batch message for %r was not formatted, skipping!',
+                                   recipient)
+                        continue
+
+                    backend_message = {
+                        "context": pref.context.name,
+                        "recipient": recipient,
+                        "fedmsg": messages,
+                        "formatted_message": formatted_message,
+                    }
+                    routing_key = BACKEND_QUEUE_PREFIX + pref.context.name
+                    producer.publish(backend_message, routing_key=routing_key,
+                                     declare=[Queue(routing_key, durable=True)])
+
+                for message in queued_messages:
+                    message.dequeue(session)
+                session.commit()
+    except sqlalchemy.exc.SQLAlchemyError:
+        _log.exception('Failed to dispatch queued messages for delivery')
+        session.rollback()
+    finally:
+        models.Session.remove()
 
 
 def _batch_ready(preference):
@@ -334,23 +383,27 @@ def _batch_ready(preference):
         bool: True if there's a batch ready.
     """
     session = models.Session()
-    count = models.QueuedMessage.count_for(session, preference.user, preference.context)
-    if not count:
-        return False
-
-    # Batch based on count
-    if preference.batch_count is not None and preference.batch_count <= count:
-        _log.info("Sending digest for %r per msg count", preference.user.openid)
-        return True
-
-    # Batch based on time
-    earliest = models.QueuedMessage.earliest_for(
-        session, preference.user, preference.context)
-    now = datetime.datetime.utcnow()
-    delta = datetime.timedelta.total_seconds(now - earliest.created_on)
-    if preference.batch_delta is not None and preference.batch_delta <= delta:
-        _log.info("Sending digest for %r per time delta", preference.user.openid)
-        return True
+    try:
+        count = models.QueuedMessage.count_for(session, preference.user, preference.context)
+        if not count:
+            return False
+
+        # Batch based on count
+        if preference.batch_count is not None and preference.batch_count <= count:
+            _log.info("Sending digest for %r per msg count", preference.user.openid)
+            return True
+
+        # Batch based on time
+        earliest = models.QueuedMessage.earliest_for(
+            session, preference.user, preference.context)
+        now = datetime.datetime.utcnow()
+        delta = datetime.timedelta.total_seconds(now - earliest.created_on)
+        if preference.batch_delta is not None and preference.batch_delta <= delta:
+            _log.info("Sending digest for %r per time delta", preference.user.openid)
+            return True
+    except sqlalchemy.exc.SQLAlchemyError:
+        _log.exception('Failed to determine if the batch is ready for %s', preference.user)
+        session.rollback()
 
     return False
 
@@ -375,37 +428,44 @@ def confirmations():
     This is intended to be dispatched regularly via celery beat.
     """
     session = models.Session()
-    models.Confirmation.delete_expired(session)
-    pending = models.Confirmation.query.filter_by(status='pending').all()
-    broker_url = config.app_conf['celery']['broker']
-    with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
-        producer = conn.Producer()
-        for confirmation in pending:
-            message = None
-            if confirmation.context.name == 'email':
-                message = formatters.email_confirmation(confirmation)
-            else:
-                # The way the irc backend is currently written, it has to format the
-                # confirmation itself. For now, just send an empty message, but in the
-                # future it may be worth refactoring the irc backend to let us format here.
-                message = ''
-            recipient = {
-                confirmation.context.detail_name: confirmation.detail_value,
-                'user': confirmation.user.openid,
-                'triggered_by_links': False,
-                'confirmation': True,
-            }
-            backend_message = {
-                "context": confirmation.context.name,
-                "recipient": recipient,
-                "fedmsg": {},
-                "formatted_message": message,
-            }
-            _log.info('Dispatching confirmation message for %r', confirmation)
-            confirmation.set_status(session, 'valid')
-            routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name
-            producer.publish(backend_message, routing_key=routing_key,
-                             declare=[Queue(routing_key, durable=True)])
+    try:
+        models.Confirmation.delete_expired(session)
+        pending = models.Confirmation.query.filter_by(status='pending').all()
+        broker_url = config.app_conf['celery']['broker']
+        with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
+            producer = conn.Producer()
+            for confirmation in pending:
+                message = None
+                if confirmation.context.name == 'email':
+                    message = formatters.email_confirmation(confirmation)
+                else:
+                    # The way the irc backend is currently written, it has to format the
+                    # confirmation itself. For now, just send an empty message, but in the
+                    # future it may be worth refactoring the irc backend to let us format here.
+                    message = ''
+                recipient = {
+                    confirmation.context.detail_name: confirmation.detail_value,
+                    'user': confirmation.user.openid,
+                    'triggered_by_links': False,
+                    'confirmation': True,
+                }
+                backend_message = {
+                    "context": confirmation.context.name,
+                    "recipient": recipient,
+                    "fedmsg": {},
+                    "formatted_message": message,
+                }
+                _log.info('Dispatching confirmation message for %r', confirmation)
+                confirmation.set_status(session, 'valid')
+                routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name
+                producer.publish(backend_message, routing_key=routing_key,
+                                 declare=[Queue(routing_key, durable=True)])
+        session.commit()
+    except sqlalchemy.exc.SQLAlchemyError:
+        _log.exception('Unable to handle confirmations')
+        session.rollback()
+    finally:
+        models.Session.remove()
 
 
 #: A Celery task that accepts a message as input and determines the recipients.
diff --git a/fmn/tests/test_tasks.py b/fmn/tests/test_tasks.py
index 80356b4..5fadde0 100644
--- a/fmn/tests/test_tasks.py
+++ b/fmn/tests/test_tasks.py
@@ -23,6 +23,7 @@ import datetime
 from dogpile import cache
 from kombu import Queue
 import mock
+import sqlalchemy
 
 from fmn import tasks, lib as fmn_lib, constants
 from fmn.exceptions import FmnError
@@ -365,6 +366,7 @@ class FormatTests(Base):
             "user": "jcline.id.fedoraproject.org",
             "verbose": True,
         }
+
     def test_failure(self, mock_formatters):
         """Assert an exception is raised if there's no formatted message."""
         mock_formatters.email.return_value = None
@@ -372,7 +374,6 @@ class FormatTests(Base):
 
         mock_formatters.email.assert_called_once_with(self.message['body'], self.recipient)
 
-
     def test_single_email(self, mock_formatters):
         """Assert single messages for email context use the correct formatter."""
         tasks._format('email', self.message, self.recipient)
@@ -398,3 +399,120 @@ class FormatTests(Base):
 
         mock_formatters.irc_batch.assert_called_once_with(
             [self.message['body'], self.message['body']], self.recipient)
+
+
+class MaybeMarkFilterFiredTests(Base):
+    """Tests for :func:`tasks._maybe_mark_filter_fired`."""
+
+    def test_no_filter_oneshot(self):
+        """Assert if the filter_oneshot key is missing, None is returned."""
+        self.assertTrue(tasks._maybe_mark_filter_fired({}) is None)
+
+    def test_filter_oneshot_false(self):
+        """Assert if the filter_oneshot key is missing, None is returned."""
+        self.assertTrue(tasks._maybe_mark_filter_fired({'filter_oneshot': False}) is None)
+
+    @mock.patch('fmn.tasks._log')
+    def test_invalid_filter_id(self, mock_log):
+        """Assert if the filter_id references an invalid key, nothing bad happens."""
+        recipient = {'filter_oneshot': True, 'filter_id': 8675309}
+
+        self.assertTrue(tasks._maybe_mark_filter_fired(recipient) is None)
+        mock_log.exception.assert_called_once_with(
+            'Unable to mark one-shot filter (id %s) as fired', 8675309)
+
+    def test_valid_oneshot_filter(self):
+        """Assert oneshot filters are properly deactivated."""
+        session = models.Session()
+        filt = models.Filter(active=True, oneshot=True)
+        session.add(filt)
+        session.commit()
+        recipient = {'filter_oneshot': True, 'filter_id': filt.id}
+
+        self.assertTrue(tasks._maybe_mark_filter_fired(recipient) is None)
+        self.assertEqual(1, models.Filter.query.count())
+        self.assertFalse(models.Filter.query.one().active)
+
+
+class BatchTests(Base):
+    """Tests for :func:`tasks._batch`."""
+
+    def setUp(self):
+        super(BatchTests, self).setUp()
+        user = models.User(
+            openid='jcline.id.fedoraproject.org', openid_url='http://jcline.id.fedoraproject.org')
+        self.sess.add(user)
+        context = models.Context(
+            name='sse', description='description', detail_name='SSE', icon='wat')
+        self.sess.add(context)
+        self.sess.commit()
+        fmn_lib.defaults.create_defaults_for(self.sess, user, detail_values={'sse': 'jcline'})
+        self.sess.commit()
+        preference = models.Preference.query.filter_by(
+            context_name='sse', openid='jcline.id.fedoraproject.org').first()
+        preference.enabled = True
+        self.sess.add(preference)
+        self.sess.commit()
+
+        self. message = {
+            "topic": "org.fedoraproject.prod.buildsys.build.state.change",
+            'body': {
+                "username": "apache",
+                "i": 1,
+                "timestamp": 1505399391.0,
+                "msg_id": "2017-7c65d9ff-85c0-42bb-8288-9b6112cb3da2",
+                "topic": "org.fedoraproject.prod.buildsys.build.state.change",
+                "msg": {
+                    "build_id": 970796,
+                    "old": 0,
+                    "name": "fedmsg",
+                    "task_id": 21861152,
+                    "attribute": "state",
+                    "request": [
+                        ("git://pkgs.fedoraproject.org/rpms/fedmsg?#"
+                         "870987e84539239a22170475bbf13ac4d2ef4382"),
+                        "f26-candidate",
+                        {},
+                    ],
+                    "instance": "primary",
+                    "version": "1.0.1",
+                    "owner": "jcline",
+                    "new": 1,
+                    "release": "4.fc26"
+                }
+            }
+        }
+
+        self.recipient = {
+            'triggered_by_links': True,
+            'markup_messages': False,
+            'user': u'jcline.id.fedoraproject.org',
+            'filter_name': u'Events referring to my username',
+            u'SSE': u'jcline',
+            'filter_oneshot': False,
+            'filter_id': 2,
+            'shorten_links': False,
+            'verbose': True
+        }
+
+    def test_no_batch_delta_or_count(self):
+        """Assert False is returned if the message isn't batched."""
+        self.assertFalse(tasks._batch({}, 'sse', self.recipient, self.message))
+
+    def test_batch_count(self):
+        """Assert the batch_count keyword triggers batching."""
+        self.assertTrue(tasks._batch({'batch_count': True}, 'sse', self.recipient, self.message))
+        self.assertEqual(1, models.QueuedMessage.query.count())
+
+    def test_batch_delta(self):
+        """Assert the batch_delta keyword triggers batching."""
+        self.assertTrue(tasks._batch({'batch_delta': True}, 'sse', self.recipient, self.message))
+        self.assertEqual(1, models.QueuedMessage.query.count())
+
+    @mock.patch('fmn.tasks.models.QueuedMessage.enqueue')
+    def test_batch_failure(self, mock_enqueue):
+        """Assert a database failure results in the message not being queued."""
+        mock_enqueue.side_effect = sqlalchemy.exc.SQLAlchemyError('boop')
+
+        self.assertFalse(tasks._batch({'batch_delta': True}, 'sse', self.recipient, self.message))
+        self.assertEqual(0, models.QueuedMessage.query.count())
-- 
2.15.1