From 7dd2fa4d15fe6c2ba28270a2a5617b99b9b9e9bd Mon Sep 17 00:00:00 2001
Message-Id: <7dd2fa4d15fe6c2ba28270a2a5617b99b9b9e9bd.1515776905.git.jeremy@jcline.org>
In-Reply-To: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org>
References: <9dea69fda2bb6168a1a7bd57b11ca4f28e8e6a73.1515776904.git.jeremy@jcline.org>
From: Jeremy Cline <jeremy@jcline.org>
Date: Thu, 21 Dec 2017 09:41:28 -0500
Subject: [PATCH 2/3] Refactor the tasks to add error handling
Add SQLAlchemy error handling to the tasks.
fixes #268
Signed-off-by: Jeremy Cline <jeremy@jcline.org>
---
fmn/tasks.py | 262 +++++++++++++++++++++++++++++-------------------
fmn/tests/test_tasks.py | 120 +++++++++++++++++++++-
2 files changed, 280 insertions(+), 102 deletions(-)
diff --git a/fmn/tasks.py b/fmn/tasks.py
index 034ac6c..3b6ea35 100644
--- a/fmn/tasks.py
+++ b/fmn/tasks.py
@@ -35,6 +35,7 @@ from celery import task
import fedmsg
import fedmsg.meta
import fedmsg_meta_fedora_infrastructure
+import sqlalchemy
from . import config, lib as fmn_lib, formatters, exceptions
from . import fmn_fasshim
@@ -183,7 +184,6 @@ class _FindRecipients(task.Task):
dictionary. See :func:`fmn.lib.recipients` for the dictionary format.
message (dict): The raw fedmsg to humanize and deliver to the given recipients.
"""
- session = models.Session()
broker_url = config.app_conf['celery']['broker']
with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
@@ -192,19 +192,11 @@ class _FindRecipients(task.Task):
_log.info('Dispatching messages for %d recipients for the %s backend',
len(recipients), context)
for recipient in recipients:
+ _maybe_mark_filter_fired(recipient)
+
user = recipient['user']
preference = self.user_preferences['{}_{}'.format(user, context)]
-
- if ('filter_oneshot' in recipient and recipient['filter_oneshot']):
- _log.info('Marking one-time filter as fired')
- idx = recipient['filter_id']
- fltr = models.Filter.query.get(idx)
- fltr.fired(session)
-
- if preference.get('batch_delta') or preference.get('batch_count'):
- _log.info('User "%s" has batch delivery set; placing message in database',
- user)
- models.QueuedMessage.enqueue(session, user, context, message)
+ if _batch(preference, context, recipient, message):
continue
formatted_message = _format(context, message, recipient)
@@ -219,7 +211,57 @@ class _FindRecipients(task.Task):
routing_key = BACKEND_QUEUE_PREFIX + context
producer.publish(backend_message, routing_key=routing_key,
declare=[Queue(routing_key, durable=True)])
- session.commit()
+
+
+def _maybe_mark_filter_fired(recipient):
+ """
+ If the filter was a one-shot filter, try to mark it as triggered. If that fails,
+ log the error and continue since there's not much else to be done.
+
+ Args:
+ recipient (dict): The recipient dictionary.
+ """
+
+ if ('filter_oneshot' in recipient and recipient['filter_oneshot']):
+ _log.info('Marking one-time filter as fired')
+ session = models.Session()
+ idx = recipient['filter_id']
+ try:
+ fltr = models.Filter.query.get(idx)
+ fltr.fired(session)
+ session.commit()
+ except (sqlalchemy.exc.SQLAlchemyError, AttributeError):
+ _log.exception('Unable to mark one-shot filter (id %s) as fired', idx)
+ session.rollback()
+ finally:
+ models.Session.remove()
+
+
+def _batch(preference, context, recipient, message):
+ """
+ Batch the message if the user wishes it.
+
+ Args:
+ preference (dict): The user's preferences in dictionary form.
+ context (str): The context to batch it for.
+ recipient (dict): The recipient dictionary.
+ message (dict): The fedmsg to batch.
+ """
+ if preference.get('batch_delta') or preference.get('batch_count'):
+ _log.info('User "%s" has batch delivery set; placing message in database',
+ recipient['user'])
+ session = models.Session()
+ try:
+ models.QueuedMessage.enqueue(session, recipient['user'], context, message)
+ session.commit()
+ return True
+ except sqlalchemy.exc.SQLAlchemyError:
+ _log.exception('Unable to queue message for batch delivery')
+ session.rollback()
+ finally:
+ models.Session.remove()
+
+ return False
def _format(context, message, recipient):
@@ -278,49 +320,56 @@ def batch_messages():
This is intended to be run as a periodic task using Celery's beat service.
"""
session = models.Session()
- broker_url = config.app_conf['celery']['broker']
- with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
- producer = conn.Producer()
- for pref in models.Preference.list_batching(session):
- if not _batch_ready(pref):
- continue
-
- queued_messages = models.QueuedMessage.list_for(
- session, pref.user, pref.context)
- _log.info('Batching %d queued messages for %s', len(queued_messages), pref.user.openid)
-
- messages = [m.message for m in queued_messages]
- recipients = [
- {
- pref.context.detail_name: value.value,
- 'user': pref.user.openid,
- 'markup_messages': pref.markup_messages,
- 'triggered_by_links': pref.triggered_by_links,
- 'shorten_links': pref.shorten_links,
- }
- for value in pref.detail_values
- ]
- for recipient in recipients:
- try:
- formatted_message = _format(pref.context.name, messages, recipient)
- except exceptions.FmnError:
- _log.error('A batch message for %r was not formatted, skipping!',
- recipient)
+ try:
+ broker_url = config.app_conf['celery']['broker']
+ with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
+ producer = conn.Producer()
+ for pref in models.Preference.list_batching(session):
+ if not _batch_ready(pref):
continue
- backend_message = {
- "context": pref.context.name,
- "recipient": recipient,
- "fedmsg": messages,
- "formatted_message": formatted_message,
- }
- routing_key = BACKEND_QUEUE_PREFIX + pref.context.name
- producer.publish(backend_message, routing_key=routing_key,
- declare=[Queue(routing_key, durable=True)])
+ queued_messages = models.QueuedMessage.list_for(
+ session, pref.user, pref.context)
+ _log.info('Batching %d queued messages for %s',
+ len(queued_messages), pref.user.openid)
- for message in queued_messages:
- message.dequeue(session)
- session.commit()
+ messages = [m.message for m in queued_messages]
+ recipients = [
+ {
+ pref.context.detail_name: value.value,
+ 'user': pref.user.openid,
+ 'markup_messages': pref.markup_messages,
+ 'triggered_by_links': pref.triggered_by_links,
+ 'shorten_links': pref.shorten_links,
+ }
+ for value in pref.detail_values
+ ]
+ for recipient in recipients:
+ try:
+ formatted_message = _format(pref.context.name, messages, recipient)
+ except exceptions.FmnError:
+ _log.error('A batch message for %r was not formatted, skipping!',
+ recipient)
+ continue
+
+ backend_message = {
+ "context": pref.context.name,
+ "recipient": recipient,
+ "fedmsg": messages,
+ "formatted_message": formatted_message,
+ }
+ routing_key = BACKEND_QUEUE_PREFIX + pref.context.name
+ producer.publish(backend_message, routing_key=routing_key,
+ declare=[Queue(routing_key, durable=True)])
+
+ for message in queued_messages:
+ message.dequeue(session)
+ session.commit()
+ except sqlalchemy.exc.SQLAlchemyError:
+ _log.exception('Failed to dispatch queued messages for delivery')
+ session.rollback()
+ finally:
+ models.Session.remove()
def _batch_ready(preference):
@@ -334,23 +383,27 @@ def _batch_ready(preference):
bool: True if there's a batch ready.
"""
session = models.Session()
- count = models.QueuedMessage.count_for(session, preference.user, preference.context)
- if not count:
- return False
-
- # Batch based on count
- if preference.batch_count is not None and preference.batch_count <= count:
- _log.info("Sending digest for %r per msg count", preference.user.openid)
- return True
-
- # Batch based on time
- earliest = models.QueuedMessage.earliest_for(
- session, preference.user, preference.context)
- now = datetime.datetime.utcnow()
- delta = datetime.timedelta.total_seconds(now - earliest.created_on)
- if preference.batch_delta is not None and preference.batch_delta <= delta:
- _log.info("Sending digest for %r per time delta", preference.user.openid)
- return True
+ try:
+ count = models.QueuedMessage.count_for(session, preference.user, preference.context)
+ if not count:
+ return False
+
+ # Batch based on count
+ if preference.batch_count is not None and preference.batch_count <= count:
+ _log.info("Sending digest for %r per msg count", preference.user.openid)
+ return True
+
+ # Batch based on time
+ earliest = models.QueuedMessage.earliest_for(
+ session, preference.user, preference.context)
+ now = datetime.datetime.utcnow()
+ delta = datetime.timedelta.total_seconds(now - earliest.created_on)
+ if preference.batch_delta is not None and preference.batch_delta <= delta:
+ _log.info("Sending digest for %r per time delta", preference.user.openid)
+ return True
+ except sqlalchemy.exc.SQLAlchemyError:
+ _log.exception('Failed to determine if the batch is ready for %s', preference.user)
+ session.rollback()
return False
@@ -375,37 +428,44 @@ def confirmations():
This is intended to be dispatched regularly via celery beat.
"""
session = models.Session()
- models.Confirmation.delete_expired(session)
- pending = models.Confirmation.query.filter_by(status='pending').all()
- broker_url = config.app_conf['celery']['broker']
- with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
- producer = conn.Producer()
- for confirmation in pending:
- message = None
- if confirmation.context.name == 'email':
- message = formatters.email_confirmation(confirmation)
- else:
- # The way the irc backend is currently written, it has to format the
- # confirmation itself. For now, just send an empty message, but in the
- # future it may be worth refactoring the irc backend to let us format here.
- message = ''
- recipient = {
- confirmation.context.detail_name: confirmation.detail_value,
- 'user': confirmation.user.openid,
- 'triggered_by_links': False,
- 'confirmation': True,
- }
- backend_message = {
- "context": confirmation.context.name,
- "recipient": recipient,
- "fedmsg": {},
- "formatted_message": message,
- }
- _log.info('Dispatching confirmation message for %r', confirmation)
- confirmation.set_status(session, 'valid')
- routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name
- producer.publish(backend_message, routing_key=routing_key,
- declare=[Queue(routing_key, durable=True)])
+ try:
+ models.Confirmation.delete_expired(session)
+ pending = models.Confirmation.query.filter_by(status='pending').all()
+ broker_url = config.app_conf['celery']['broker']
+ with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn:
+ producer = conn.Producer()
+ for confirmation in pending:
+ message = None
+ if confirmation.context.name == 'email':
+ message = formatters.email_confirmation(confirmation)
+ else:
+ # The way the irc backend is currently written, it has to format the
+ # confirmation itself. For now, just send an empty message, but in the
+ # future it may be worth refactoring the irc backend to let us format here.
+ message = ''
+ recipient = {
+ confirmation.context.detail_name: confirmation.detail_value,
+ 'user': confirmation.user.openid,
+ 'triggered_by_links': False,
+ 'confirmation': True,
+ }
+ backend_message = {
+ "context": confirmation.context.name,
+ "recipient": recipient,
+ "fedmsg": {},
+ "formatted_message": message,
+ }
+ _log.info('Dispatching confirmation message for %r', confirmation)
+ confirmation.set_status(session, 'valid')
+ routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name
+ producer.publish(backend_message, routing_key=routing_key,
+ declare=[Queue(routing_key, durable=True)])
+ session.commit()
+ except sqlalchemy.exc.SQLAlchemyError:
+ _log.exception('Unable to handle confirmations')
+ session.rollback()
+ finally:
+ models.Session.remove()
#: A Celery task that accepts a message as input and determines the recipients.
diff --git a/fmn/tests/test_tasks.py b/fmn/tests/test_tasks.py
index 80356b4..5fadde0 100644
--- a/fmn/tests/test_tasks.py
+++ b/fmn/tests/test_tasks.py
@@ -23,6 +23,7 @@ import datetime
from dogpile import cache
from kombu import Queue
import mock
+import sqlalchemy
from fmn import tasks, lib as fmn_lib, constants
from fmn.exceptions import FmnError
@@ -365,6 +366,7 @@ class FormatTests(Base):
"user": "jcline.id.fedoraproject.org",
"verbose": True,
}
+
def test_failure(self, mock_formatters):
"""Assert an exception is raised if there's no formatted message."""
mock_formatters.email.return_value = None
@@ -372,7 +374,6 @@ class FormatTests(Base):
mock_formatters.email.assert_called_once_with(self.message['body'], self.recipient)
-
def test_single_email(self, mock_formatters):
"""Assert single messages for email context use the correct formatter."""
tasks._format('email', self.message, self.recipient)
@@ -398,3 +399,120 @@ class FormatTests(Base):
mock_formatters.irc_batch.assert_called_once_with(
[self.message['body'], self.message['body']], self.recipient)
+
+
+class MaybeMarkFilterFiredTests(Base):
+ """Tests for :func:`tasks._maybe_mark_filter_fired`."""
+
+ def test_no_filter_oneshot(self):
+ """Assert if the filter_oneshot key is missing, None is returned."""
+ self.assertTrue(tasks._maybe_mark_filter_fired({}) is None)
+
+ def test_filter_oneshot_false(self):
+ """Assert if the filter_oneshot key is missing, None is returned."""
+ self.assertTrue(tasks._maybe_mark_filter_fired({'filter_oneshot': False}) is None)
+
+ @mock.patch('fmn.tasks._log')
+ def test_invalid_filter_id(self, mock_log):
+ """Assert if the filter_id references an invalid key, nothing bad happens."""
+ recipient = {'filter_oneshot': True, 'filter_id': 8675309}
+
+ self.assertTrue(tasks._maybe_mark_filter_fired(recipient) is None)
+ mock_log.exception.assert_called_once_with(
+ 'Unable to mark one-shot filter (id %s) as fired', 8675309)
+
+ def test_valid_oneshot_filter(self):
+ """Assert oneshot filters are properly deactivated."""
+ session = models.Session()
+ filt = models.Filter(active=True, oneshot=True)
+ session.add(filt)
+ session.commit()
+ recipient = {'filter_oneshot': True, 'filter_id': filt.id}
+
+ self.assertTrue(tasks._maybe_mark_filter_fired(recipient) is None)
+ self.assertEqual(1, models.Filter.query.count())
+ self.assertFalse(models.Filter.query.one().active)
+
+
+class BatchTests(Base):
+ """Tests for :func:`tasks._batch`."""
+
+ def setUp(self):
+ super(BatchTests, self).setUp()
+ user = models.User(
+ openid='jcline.id.fedoraproject.org', openid_url='http://jcline.id.fedoraproject.org')
+ self.sess.add(user)
+ context = models.Context(
+ name='sse', description='description', detail_name='SSE', icon='wat')
+ self.sess.add(context)
+ self.sess.commit()
+ fmn_lib.defaults.create_defaults_for(self.sess, user, detail_values={'sse': 'jcline'})
+ self.sess.commit()
+ preference = models.Preference.query.filter_by(
+ context_name='sse', openid='jcline.id.fedoraproject.org').first()
+ preference.enabled = True
+ self.sess.add(preference)
+ self.sess.commit()
+
+ self. message = {
+ "topic": "org.fedoraproject.prod.buildsys.build.state.change",
+ 'body': {
+ "username": "apache",
+ "i": 1,
+ "timestamp": 1505399391.0,
+ "msg_id": "2017-7c65d9ff-85c0-42bb-8288-9b6112cb3da2",
+ "topic": "org.fedoraproject.prod.buildsys.build.state.change",
+ "msg": {
+ "build_id": 970796,
+ "old": 0,
+ "name": "fedmsg",
+ "task_id": 21861152,
+ "attribute": "state",
+ "request": [
+ ("git://pkgs.fedoraproject.org/rpms/fedmsg?#"
+ "870987e84539239a22170475bbf13ac4d2ef4382"),
+ "f26-candidate",
+ {},
+ ],
+ "instance": "primary",
+ "version": "1.0.1",
+ "owner": "jcline",
+ "new": 1,
+ "release": "4.fc26"
+ }
+ }
+ }
+
+ self.recipient = {
+ 'triggered_by_links': True,
+ 'markup_messages': False,
+ 'user': u'jcline.id.fedoraproject.org',
+ 'filter_name': u'Events referring to my username',
+ u'SSE': u'jcline',
+ 'filter_oneshot': False,
+ 'filter_id': 2,
+ 'shorten_links': False,
+ 'verbose': True
+ }
+
+ def test_no_batch_delta_or_count(self):
+ """Assert False is returned if the message isn't batched."""
+ self.assertFalse(tasks._batch({}, 'sse', self.recipient, self.message))
+
+ def test_batch_count(self):
+ """Assert the batch_count keyword triggers batching."""
+ self.assertTrue(tasks._batch({'batch_count': True}, 'sse', self.recipient, self.message))
+ self.assertEqual(1, models.QueuedMessage.query.count())
+
+ def test_batch_delta(self):
+ """Assert the batch_delta keyword triggers batching."""
+ self.assertTrue(tasks._batch({'batch_delta': True}, 'sse', self.recipient, self.message))
+ self.assertEqual(1, models.QueuedMessage.query.count())
+
+ @mock.patch('fmn.tasks.models.QueuedMessage.enqueue')
+ def test_batch_failure(self, mock_enqueue):
+ """Assert a database failure results in the message not being queued."""
+ mock_enqueue.side_effect = sqlalchemy.exc.SQLAlchemyError('boop')
+
+ self.assertFalse(tasks._batch({'batch_delta': True}, 'sse', self.recipient, self.message))
+ self.assertEqual(0, models.QueuedMessage.query.count())
--
2.15.1