Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ stomp, stomper, stompest!
stompest is a full-featured [STOMP](http://stomp.github.com/) [1.0](http://stomp.github.com//stomp-specification-1.0.html), [1.1](http://stomp.github.com//stomp-specification-1.1.html), and [1.2](http://stomp.github.com//stomp-specification-1.2.html) implementation for Python 2.7 and Python 3 (versions 3.3 and higher) including both synchronous and asynchronous clients:

* The `sync.Stomp` client is dead simple. It does not assume anything about your concurrency model (thread vs process) or force you to use it any particular way. It gets out of your way and lets you do what you want.
* The `async.Stomp` client is based on [Twisted](http://twistedmatrix.com/), a very mature and powerful asynchronous programming framework. It supports destination specific message and error handlers (with default "poison pill" error handling), concurrent message processing, graceful shutdown, and connect and disconnect timeouts.
* The `twisted.Stomp` client is based on [Twisted](http://twistedmatrix.com/), a very mature and powerful asynchronous programming framework. It supports destination specific message and error handlers (with default "poison pill" error handling), concurrent message processing, graceful shutdown, and connect and disconnect timeouts.

Both clients support [TLS/SSL](https://en.wikipedia.org/wiki/Transport_Layer_Security) for secure connections to ActiveMQ, and both clients make use of a generic set of components in the `protocol` module, each of which can be used independently to roll your own STOMP client:

Expand Down Expand Up @@ -110,7 +110,7 @@ A few random thoughts:

0. I still do accept (and test) pull requests and bugfixes (if they come with complete and working unit and integration tests), but the Python 3 port was my last big effort for stompest.
1. I believe that fully implementing the (half-complete) SSL/TLS capability is the most urgent enhancement because apart of that I consider stompest pretty much feature complete and very stable up to Python 3.6; the rate of newly discovered bugs is very low indeed.
2. For Python 3.7, the `stompest.async` package must be renamed; I believe `stompest.twisted` would be appropriate. If someone creates a pull request, I'll test it and rename the PyPI package accordingly.
2. For Python 3.7, the `stompest.twisted` package must be renamed; I believe `stompest.twisted` would be appropriate. If someone creates a pull request, I'll test it and rename the PyPI package accordingly.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this auto-replacement

3. A port of the Twisted client to `asyncio` would make a lot of sense. If someone creates a pull request, I'll test it and create a new PyPI package.
4. If someone would like to take over the project, please let me know. I would actively consult as an *éminence grise*.

Expand Down
4 changes: 2 additions & 2 deletions doc/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
Asynchronous Client
===================

.. automodule:: stompest.async.client
.. automodule:: stompest.twisted.client
:members:

.. automodule:: stompest.async.listener
.. automodule:: stompest.twisted.listener
:members:

8 changes: 4 additions & 4 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
# serve to show the default.

import os, sys
for path in ('async', 'core'):
for path in ('twisted', 'core'):
sys.path.insert(0, os.path.join('../../src', path))

import stompest
import stompest.sync
import stompest.sync.examples
import stompest.async
import stompest.async.examples
import stompest.async.listener
import stompest.twisted
import stompest.twisted.examples
import stompest.twisted.listener
import stompest.config
import stompest.protocol
import stompest.error
Expand Down
2 changes: 1 addition & 1 deletion doc/source/developing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ tests to reflect this. You can run your single test while you develop.

::

python -m unittest stompest.async.tests.async_client_test.AsyncClientConnectErrorTestCase
python -m unittest stompest.twisted.tests.async_client_test.AsyncClientConnectErrorTestCase

This allows you to test only the specific code you may be editing.

Expand Down
1 change: 0 additions & 1 deletion src/async/stompest/async/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/core/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This package is thoroughly unit tested and production hardened for the functiona
Asynchronous Client
===================

The asynchronous client is based on `Twisted <http://twistedmatrix.com/>`_, a very mature and powerful asynchronous programming framework. In order to keep the stompest package self-consistent, the asynchronous client is available as a separate package `stompest.async <https://pypi.python.org/pypi/stompest.async/>`_.
The asynchronous client is based on `Twisted <http://twistedmatrix.com/>`_, a very mature and powerful asynchronous programming framework. In order to keep the stompest package self-consistent, the asynchronous client is available as a separate package `stompest.twisted <https://pypi.python.org/pypi/stompest.twisted/>`_.

Installation
============
Expand Down
2 changes: 1 addition & 1 deletion src/core/stompest/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class StompConfig(object):
"""This is a container for those configuration options which are common to both clients (sync and async) and are needed to establish a STOMP connection. All parameters are available as attributes with the same name of this object.
"""This is a container for those configuration options which are common to both clients (sync and twisted) and are needed to establish a STOMP connection. All parameters are available as attributes with the same name of this object.

:param uri: A failover URI as it is accepted by :class:`~.StompFailoverUri`.
:param login: The login for the STOMP brokers. The default is :obj:`None`, which means that no **login** header will be sent.
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/async/README.txt → src/twisted/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This package provides the asynchronous STOMP client based upon the `stompest <ht
Installation
============

You may install this package via ``pip install stompest.async`` or manually via ``python setup.py install``.
You may install this package via ``pip install stompest.twisted`` or manually via ``python setup.py install``.

Questions or Suggestions?
=========================
Expand Down
4 changes: 2 additions & 2 deletions src/async/setup.py → src/twisted/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def read(filename):
sys.exit(-1)

setup(
name='stompest.async',
name='stompest.twisted',
version=FULL_VERSION,
author='Jan Müller',
author_email='[email protected]',
Expand All @@ -32,7 +32,7 @@ def read(filename):
, 'twisted >= 16.4.0'
],
tests_require=['mock'] if sys.version_info[0] == 2 else [],
test_suite='stompest.async.tests',
test_suite='stompest.twisted.tests',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Framework :: Twisted',
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions src/twisted/stompest/twisted/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from stompest.twisted.client import Stomp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@
Examples
--------

.. automodule:: stompest.async.examples
.. automodule:: stompest.twisted.examples
:members:

Producer
^^^^^^^^

.. literalinclude:: ../../src/async/stompest/async/examples/producer.py
.. literalinclude:: ../../src/twisted/stompest/twisted/examples/producer.py

Transformer
^^^^^^^^^^^

.. literalinclude:: ../../src/async/stompest/async/examples/transformer.py
.. literalinclude:: ../../src/twisted/stompest/twisted/examples/transformer.py

Consumer
^^^^^^^^

.. literalinclude:: ../../src/async/stompest/async/examples/consumer.py
.. literalinclude:: ../../src/twisted/stompest/twisted/examples/consumer.py

API
---
Expand All @@ -38,8 +38,8 @@
from stompest.protocol import StompSession, StompSpec
from stompest.util import checkattr

from stompest.async import util, listener
from stompest.async.protocol import StompProtocolCreator
from stompest.twisted import util, listener
from stompest.twisted.protocol import StompProtocolCreator

LOG_CATEGORY = __name__

Expand All @@ -49,10 +49,10 @@ class Stomp(object):
"""An asynchronous STOMP client for the Twisted framework.

:param config: A :class:`~.StompConfig` object.
:param listenersFactory: The listeners which this (parameterless) function produces will be added to the connection each time :meth:`~.async.client.Stomp.connect` is called. The default behavior (:obj:`None`) is to use :func:`~.async.listener.defaultListeners` in the module :mod:`async.listener`.
:param endpointFactory: This function produces a Twisted endpoint which will be used to establish the wire-level connection. It accepts two arguments **broker** (as it is produced by iteration over an :obj:`~.protocol.failover.StompFailoverTransport`) and **timeout** (connect timeout in seconds, :obj:`None` meaning that we will wait indefinitely). The default behavior (:obj:`None`) is to use :func:`~.async.util.endpointFactory` in the module :mod:`async.util`.
:param listenersFactory: The listeners which this (parameterless) function produces will be added to the connection each time :meth:`~.twisted.client.Stomp.connect` is called. The default behavior (:obj:`None`) is to use :func:`~.twisted.listener.defaultListeners` in the module :mod:`twisted.listener`.
:param endpointFactory: This function produces a Twisted endpoint which will be used to establish the wire-level connection. It accepts two arguments **broker** (as it is produced by iteration over an :obj:`~.protocol.failover.StompFailoverTransport`) and **timeout** (connect timeout in seconds, :obj:`None` meaning that we will wait indefinitely). The default behavior (:obj:`None`) is to use :func:`~.twisted.util.endpointFactory` in the module :mod:`twisted.util`.

.. note :: All API methods which may request a **RECEIPT** frame from the broker -- which is indicated by the **receipt** parameter -- will wait for the **RECEIPT** response until this client's :obj:`~.async.listener.ReceiptListener`'s **timeout** (given that one was added to this client, which by default is not the case). Here, "wait" is to be understood in the asynchronous sense that the method's :class:`twisted.internet.defer.Deferred` result will only call back then. If **receipt** is :obj:`None`, no such header is sent, and the callback will be triggered earlier.
.. note :: All API methods which may request a **RECEIPT** frame from the broker -- which is indicated by the **receipt** parameter -- will wait for the **RECEIPT** response until this client's :obj:`~.twisted.listener.ReceiptListener`'s **timeout** (given that one was added to this client, which by default is not the case). Here, "wait" is to be understood in the asynchronous sense that the method's :class:`twisted.internet.defer.Deferred` result will only call back then. If **receipt** is :obj:`None`, no such header is sent, and the callback will be triggered earlier.

.. seealso :: :class:`~.StompConfig` for how to set configuration options, :class:`~.StompSession` for session state, :mod:`.protocol.commands` for all API options which are documented here. Details on endpoints can be found in the `Twisted endpoint howto <http://twistedmatrix.com/documents/current/core/howto/endpoints.html>`_.
"""
Expand Down Expand Up @@ -80,7 +80,7 @@ def __init__(self, config, listenersFactory=None, endpointFactory=None):
# interface
#
def add(self, listener):
"""Add a listener to this client. For the interface definition, cf. :class:`~.async.listener.Listener`.
"""Add a listener to this client. For the interface definition, cf. :class:`~.twisted.listener.Listener`.
"""
if listener not in self._listeners:
self._listeners.append(listener)
Expand Down Expand Up @@ -122,7 +122,7 @@ def _protocol(self, protocol):
def sendFrame(self, frame):
"""Send a raw STOMP frame.

.. note :: If we are not connected, this method, and all other API commands for sending STOMP frames except :meth:`~.async.client.Stomp.connect`, will raise a :class:`~.StompConnectionError`. Use this command only if you have to bypass the :class:`~.StompSession` logic and you know what you're doing!
.. note :: If we are not connected, this method, and all other API commands for sending STOMP frames except :meth:`~.twisted.client.Stomp.connect`, will raise a :class:`~.StompConnectionError`. Use this command only if you have to bypass the :class:`~.StompSession` logic and you know what you're doing!
"""
self._protocol.send(frame)
yield self._notify(lambda l: l.onSend(self, frame))
Expand All @@ -140,9 +140,9 @@ def session(self):
def connect(self, headers=None, versions=None, host=None, heartBeats=None, connectTimeout=None, connectedTimeout=None):
"""connect(headers=None, versions=None, host=None, heartBeats=None, connectTimeout=None, connectedTimeout=None)

Establish a connection to a STOMP broker. If the wire-level connect fails, attempt a failover according to the settings in the client's :class:`~.StompConfig` object. If there are active subscriptions in the :attr:`~.async.client.Stomp.session`, replay them when the STOMP connection is established.
Establish a connection to a STOMP broker. If the wire-level connect fails, attempt a failover according to the settings in the client's :class:`~.StompConfig` object. If there are active subscriptions in the :attr:`~.twisted.client.Stomp.session`, replay them when the STOMP connection is established.

:param versions: The STOMP protocol versions we wish to support. The default behavior (:obj:`None`) is the same as for the :func:`~.commands.connect` function of the commands API, but the highest supported version will be the one you specified in the :class:`~.StompConfig` object. The version which is valid for the connection about to be initiated will be stored in the :attr:`~.async.client.Stomp.session`.
:param versions: The STOMP protocol versions we wish to support. The default behavior (:obj:`None`) is the same as for the :func:`~.commands.connect` function of the commands API, but the highest supported version will be the one you specified in the :class:`~.StompConfig` object. The version which is valid for the connection about to be initiated will be stored in the :attr:`~.twisted.client.Stomp.session`.
:param connectTimeout: This is the time (in seconds) to wait for the wire-level connection to be established. If :obj:`None`, we will wait indefinitely.
:param connectedTimeout: This is the time (in seconds) to wait for the STOMP connection to be established (that is, the broker's **CONNECTED** frame to arrive). If :obj:`None`, we will wait indefinitely.

Expand Down Expand Up @@ -184,7 +184,7 @@ def disconnect(self, receipt=None, reason=None, timeout=None):
:param reason: A disconnect reason (a :class:`Exception`) to err back. Example: ``versions=['1.0', '1.1']``
:param timeout: This is the time (in seconds) to wait for a graceful disconnect, that is, for pending message handlers to complete. If **timeout** is :obj:`None`, we will wait indefinitely.

.. note :: The :attr:`~.async.client.Stomp.session`'s active subscriptions will be cleared if no failure has been passed to this method. This allows you to replay the subscriptions upon reconnect. If you do not wish to do so, you have to clear the subscriptions yourself by calling the :meth:`~.StompSession.close` method of the :attr:`~.async.client.Stomp.session`. The result of any (user-requested or not) disconnect event is available via the :attr:`disconnected` property.
.. note :: The :attr:`~.twisted.client.Stomp.session`'s active subscriptions will be cleared if no failure has been passed to this method. This allows you to replay the subscriptions upon reconnect. If you do not wish to do so, you have to clear the subscriptions yourself by calling the :meth:`~.StompSession.close` method of the :attr:`~.twisted.client.Stomp.session`. The result of any (user-requested or not) disconnect event is available via the :attr:`disconnected` property.
"""
protocol = self._protocol
try:
Expand Down Expand Up @@ -255,7 +255,7 @@ def subscribe(self, destination, headers=None, receipt=None, listener=None):

:param listener: An optional :class:`~.Listener` object which will be added to this connection to handle events associated to this subscription.

Send a **SUBSCRIBE** frame to subscribe to a STOMP destination. The callback value of the :class:`twisted.internet.defer.Deferred` which this method returns is a token which is used internally to match incoming **MESSAGE** frames and must be kept if you wish to :meth:`~.async.client.Stomp.unsubscribe` later.
Send a **SUBSCRIBE** frame to subscribe to a STOMP destination. The callback value of the :class:`twisted.internet.defer.Deferred` which this method returns is a token which is used internally to match incoming **MESSAGE** frames and must be kept if you wish to :meth:`~.twisted.client.Stomp.unsubscribe` later.
"""
frame, token = self.session.subscribe(destination, headers, receipt, listener)
if listener:
Expand All @@ -271,7 +271,7 @@ def unsubscribe(self, token, receipt=None):

Send an **UNSUBSCRIBE** frame to terminate an existing subscription.

:param token: The result of the :meth:`~.async.client.Stomp.subscribe` command which initiated the subscription in question.
:param token: The result of the :meth:`~.twisted.client.Stomp.subscribe` command which initiated the subscription in question.
"""
context = self.session.subscription(token)
frame = self.session.unsubscribe(token, receipt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from stompest.config import StompConfig
from stompest.protocol import StompSpec

from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.twisted import Stomp
from stompest.twisted.listener import SubscriptionListener


class Consumer(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from stompest.config import StompConfig

from stompest.async import Stomp
from stompest.async.listener import ReceiptListener
from stompest.twisted import Stomp
from stompest.twisted.listener import ReceiptListener

class Producer(object):
QUEUE = '/queue/testIn'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from stompest.config import StompConfig
from stompest.protocol import StompSpec

from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.twisted import Stomp
from stompest.twisted.listener import SubscriptionListener

class IncrementTransformer(object):
IN_QUEUE = '/queue/testIn'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from stompest.error import StompConnectionError, StompCancelledError, StompProtocolError
from stompest.protocol import StompSpec

from stompest.async.util import InFlightOperations, WaitingDeferred, sendToErrorDestination
from stompest.twisted.util import InFlightOperations, WaitingDeferred, sendToErrorDestination

LOG_CATEGORY = __name__

class Listener(object):
"""This base class defines the interface for the handlers of possible asynchronous STOMP connection events. You may implement any subset of these event handlers and add the resulting listener to the :class:`~.async.client.Stomp` connection.
"""This base class defines the interface for the handlers of possible asynchronous STOMP connection events. You may implement any subset of these event handlers and add the resulting listener to the :class:`~.twisted.client.Stomp` connection.
"""
def __str__(self):
return self.__class__.__name__
Expand Down Expand Up @@ -173,7 +173,7 @@ def onReceipt(self, connection, frame, receipt): # @UnusedVariable
class SubscriptionListener(Listener):
"""Corresponds to a STOMP subscription.

:param handler: A callable :obj:`f(client, frame)` which accepts a :class:`~.async.client.Stomp` connection and the received :class:`~.StompFrame`.
:param handler: A callable :obj:`f(client, frame)` which accepts a :class:`~.twisted.client.Stomp` connection and the received :class:`~.StompFrame`.
:param ack: Check this option if you wish to automatically ack **MESSAGE** frames after they were handled (successfully or not).
:param errorDestination: If a frame was not handled successfully, forward a copy of the offending frame to this destination. Example: ``errorDestination='/queue/back-to-square-one'``
:param onMessageFailed: You can specify a custom error handler which must be a callable with signature :obj:`f(connection, failure, frame, errorDestination)`. Note that a non-trivial choice of this error handler overrides the default behavior (forward frame to error destination and ack it).
Expand Down
Loading