Blob Blame History Raw
diff --git a/server/odcs/server/config.py b/server/odcs/server/config.py
index 605c2c7..e15b2d9 100644
--- a/server/odcs/server/config.py
+++ b/server/odcs/server/config.py
@@ -254,7 +254,7 @@ class Config(object):
         'messaging_backend': {
             'type': str,
             'default': '',
-            'desc': 'Messaging backend, fedmsg or umb.'},
+            'desc': 'Messaging backend, fedmsg, rhmsg or fedora-messaging.'},
         'messaging_broker_urls': {
             'type': list,
             'default': [],
diff --git a/server/odcs/server/messaging.py b/server/odcs/server/messaging.py
index f234b1b..f56884e 100644
--- a/server/odcs/server/messaging.py
+++ b/server/odcs/server/messaging.py
@@ -71,11 +71,26 @@ def _fedmsg_send_msg(msgs):
         fedmsg.publish(topic=topic, msg=msg)
 
 
+def _fedora_messaging_send_msg(msgs):
+    """Send message to fedora-messaging."""
+    from fedora_messaging import api, config
+    config.conf.setup_logging()
+
+    for msg in msgs:
+        # "event" is typically just "state-changed"
+        event = msg.get('event', 'event')
+        topic = "compose.%s" % event
+
+        api.publish(api.Message(topic=topic, body=msg))
+
+
 def _get_messaging_backend():
     if conf.messaging_backend == 'rhmsg':
         return _umb_send_msg
     elif conf.messaging_backend == 'fedmsg':
         return _fedmsg_send_msg
+    elif conf.messaging_backend == 'fedora-messaging':
+        return _fedora_messaging_send_msg
     elif conf.messaging_backend:
         raise ValueError(
             'Unknown messaging backend {0}'.format(conf.messaging_backend))
diff --git a/server/tests/test_events.py b/server/tests/test_events.py
index 5847715..99e0335 100644
--- a/server/tests/test_events.py
+++ b/server/tests/test_events.py
@@ -44,6 +44,11 @@ try:
 except ImportError:
     fedmsg = None
 
+try:
+    import fedora_messaging
+except ImportError:
+    fedora_messaging = None
+
 
 @unittest.skipUnless(rhmsg, 'rhmsg is required to run this test case.')
 @unittest.skipIf(six.PY3, 'rhmsg has no Python 3 package so far.')
@@ -152,3 +157,60 @@ class TestFedMsgSendMessageWhenComposeIsCreated(ModelsBaseTest):
         compose.state = COMPOSE_STATES['generating']
 
         self.assert_messaging(compose)
+
+
+@unittest.skipUnless(fedora_messaging, 'fedora_messaging is required to run this test case.')
+class TestFedoraMessagingSendMessageWhenComposeIsCreated(ModelsBaseTest):
+    """Test send message when compose is created"""
+
+    disable_event_handlers = False
+
+    def setUp(self):
+        super(TestFedoraMessagingSendMessageWhenComposeIsCreated, self).setUp()
+
+        # Real lock is not required for running tests
+        self.mock_lock = patch('threading.Lock')
+        self.mock_lock.start()
+
+    def tearDown(self):
+        self.mock_lock.stop()
+
+    def setup_composes(self):
+        self.compose = Compose.create(db.session,
+                                      "mine",
+                                      PungiSourceType.KOJI_TAG,
+                                      "f25",
+                                      COMPOSE_RESULTS["repository"],
+                                      3600)
+        db.session.commit()
+
+    @patch.object(conf, 'messaging_backend', new='fedora-messaging')
+    @patch('fedora_messaging.api.Message')
+    @patch('fedora_messaging.api.publish')
+    def assert_messaging(self, compose, publish, Message):
+        # The db.session.commit() calls on-commit handler which produces the fedora-messaging
+        # message.
+        db.session.commit()
+
+        Message.assert_called_once_with(
+            topic="compose.state-changed",
+            body={'event': 'state-changed', 'compose': compose.json()})
+
+        publish.assert_called_once_with(Message.return_value)
+
+    def test_send_message(self):
+        compose = Compose.create(db.session,
+                                 "me",
+                                 PungiSourceType.MODULE,
+                                 "testmodule-master",
+                                 COMPOSE_RESULTS["repository"],
+                                 3600)
+
+        self.assert_messaging(compose)
+
+    def test_message_on_state_change(self):
+        compose = db.session.query(Compose).filter(
+            Compose.id == self.compose.id).all()[0]
+        compose.state = COMPOSE_STATES['generating']
+
+        self.assert_messaging(compose)