Skip to content

Latest commit

 

History

History

source

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Eventing RabbitMQ Source

The RabbitMQ source translates messages on a RabbitMQ exchange to CloudEvents based on the RabbitMQ Protocol Binding for CloudEvents Spec, which can then be used with Knative Eventing over HTTP. The source can bind to an existing RabbitMQ exchange, or create a new exchange if required.

Eventing RabbitMQ Source

Table of Contents

Prerequisites

Install using the RabbitMQ Message Topology Operator and RabbitMQ Cluster Operator (Recommended)

kubectl apply -f - << EOF
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq
  namespace: default
spec:
  replicas: 1
EOF

Install without the RabbitMQ Cluster Operator

  • You will need a RabbitMQ instance running and accessible via an URL/IP

  • Install everything except the RabbitMQ Cluster Operator

  • Note: An external RabbitMQ instance can be used, but if you want to use the Source without predeclared resources (specifically the Exchange and Queue), the RabbitMQ Message Topology Operator needs to be installed in the same Kubernetes Cluster as the Source.

Installation

You can install the latest released version of the Knative RabbitMQ Source:

kubectl apply --filename https://github.com/knative-extensions/eventing-rabbitmq/releases/latest/download/rabbitmq-source.yaml

If you wanted to install a specific version, e.g., v0.25.0, you can run:

kubectl apply --filename https://github.com/knative-extensions/eventing-rabbitmq/releases/download/v0.25.0/rabbitmq-source.yaml

You can install a nightly version:

kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-rabbitmq/latest/rabbitmq-source.yaml

Or if you want to run the latest version from this repo, you can use ko to install it.

  • Install the ko CLI for building and deploying purposes.

    go install github.com/google/ko@latest
  • Configure container registry, such as a Docker Hub account, is required.

  • Export the KO_DOCKER_REPO environment variable with a value denoting the container registry to use.

    export KO_DOCKER_REPO="docker.io/YOUR_REPO"
  • Install the source operator

    ko apply -f config/source/
    

Now you can create a RabbitMQ source in the default namespace running:

kubectl apply -f - << EOF
apiVersion: sources.knative.dev/v1alpha1
kind: RabbitmqSource
metadata:
  name: rabbitmq-source
spec:
  rabbitmqClusterReference:
    name: rabbitmq-default-user # set this or connectionSecret not both
    namespace: default
    connectionSecret: # set this or rabbitmqClusterReference.name not both
      name: test-secret
  rabbitmqResourcesConfig:
    exchangeName: "eventing-rabbitmq-source"
    queueName: "eventing-rabbitmq-source"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: rabbitmq-source-sink
      namespace: source-demo
EOF

Published Events

All messages received by the source are published in binary mode with the following schema:

If it is already a CloudEvent, the message is forwarded in binary mode. If it isn't:

Event attributes

Attribute Value Notes
type dev.knative.rabbitmq.event
source /apis/v1/namespace/*$NS*/rabbitmqsources/*$NAME*#*$QUEUE_NAME* NS, NAME and QUEUE_NAME are derived from the source configuration
id A unique ID This uses the MessageId if available, and a UUID otherwise
subject The ID of the message Empty string if no message ID is present
content-type Extracted from the RabbitMQ MessageContentType Any valid Content Type
time A timestamp This uses the MessageTimestamp if Available

The payload of the event is set to the data content of the message.

Samples

For a message published with the payload "Hello rabbitmq!", for example with rabbitmqadmin:

rabbitmqadmin publish exchange=amq.default payload="Hello rabbitmq!"

The source sends the following event content:

.CloudEvents JSON format

{
  "specversion": "1.0",
  "type": "dev.knative.rabbitmq.event",
  "source": "/apis/v1/namespaces/default/rabbitmqsources/rabbitmq-source",
  "id": "f00c1f52-33a1-4d3d-993f-750f20c804da",
  "time": "2020-12-18T01:15:20.450860898Z",
  "subject": "f00c1f52-33a1-4d3d-993f-750f20c804da",
  "datacontenttype": "application/json",
  "data": "Hello rabbitmq!"
}

Creating and Managing Sources

Sources are Kubernetes objects. In addition to the standard Kubernetes apiVersion, kind, and metadata, they have the following spec fields:

Source parameters

Field Value
rabbitmqClusterReference A reference to a RabbitMQ Cluster
rabbitmqClusterReference.name This is the name of the RabbitMQ Cluster CRD, if this is set then the connectionSecret must not be set
rabbitmqClusterReference.connectionSecret.name The RabbitMQ Connection Secret name. The secret must contain uri, username and password. It may optionally contain port or will be defaulted to 5672. If this is set then the clusterReference.name must not be set
rabbitmqClusterReference.namespace the namespace where the RabbitMQ Cluster CRD lives
rabbitmqResourcesConfig stores all the configuration parameters related to RabbitMQ resources created by the Source.
rabbitmqResourcesConfig.vhost _ VHost where the source RabbitMQ resources are located
rabbitmqResourcesConfig.predeclared Defines if the source should try to create new queue or use predeclared one (Boolean)
rabbitmqResourcesConfig.exchangeName Name of the Exchange
rabbitmqResourcesConfig.queueName Name of the Queue
rabbitmqResourcesConfig.parallelism _ Int that sets the Consumer Prefetch Value and creates n parallel consumer processes. Default value is 1. Value must be between 1 and 1000. With a value of 1 the RabbitMQ Source process events in FIFO order, values above 1 break message ordering guarantees and can be seen as more performance oriented.
delivery Delivery stores the backoff strategy and retry configuration for the RabbitMQ Source.
delivery.backoffPolicy The backoff policy type (linear, exponential) String
delivery.backoffDelay Is the delay to be used before retrying by the backoff policy (String)
delivery.retry Number of retries to be used by the backoff policy (Int)
sink A reference to an Addressable Kubernetes object
serviceAccountName * The service account name to be used by the Receive Adapter deployments generated by the Source

* These attributes are optional.

You will need a Kubernetes Secret to hold the RabbitMQ username and password. The RabbitMQ Cluster Operator creates a default one.

  • To create and edit the Source's default user secret and add the the RabbitMQ http uri, see the Source's samples Readme

Note that many parameters do not need to be specified. Unspecified optional parameters will be defaulted to false or "" (empty string).

apiVersion: sources.knative.dev/v1alpha1
kind: RabbitmqSource
metadata:
  name: rabbitmq-source
spec:
  rabbitmqClusterReference:
    name: rabbitmq
  rabbitmqResourcesConfig:
    queueName: 'a-queue'
    predeclared: true
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

The Source will provide output information about readiness or errors via the status field on the object once it has been created in the cluster.

Configuration Options

  • Event source parameters.

    • Configure channel config properties based on this documentation.

      1. To get round-robin behavior between consumers consuming from the same queue on
      different connections, set the parallelism to 1, and the next available
      message on the server will be delivered to the next available consumer.
      
      2. If your consumer work time is reasonably consistent and not much greater
      than two times your network round trip time, you will see significant
      throughput improvements starting with a prallelism of 2 or slightly
      greater as described by benchmarks on RabbitMQ.
      
      3. http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
      
  • Observability Configuration

  • Logging Configuration ConfigMaps may be used to manage the logging and metrics configuration.

Next Steps

Check out the Source Samples Directory in this repo and start converting your messages to CloudEvents with Eventing RabbitMQ!

Additional Resources

Upgrade

  • Prior to upgrading eventing-rabbitmq, Knative and its components should be updated according to instructions here. Be sure to pay attention to any steps for upgrading Custom Resource Definitions (CRDs) and only upgrade one minor version at a time.
  • Upgrade RabbitMQ Cluster Operator and RabbitMQ Topology Operator

Upgrade eventing-rabbitmq one minor version at a time while following any migration steps outlined in release notes to migrate the RabbitMQ Source CRD. Components and resources can be applied in a similar fashion to installation:

kubectl apply --filename https://github.com/knative-extensions/eventing-rabbitmq/releases/download/knative-v1.4.0/rabbitmq-source.yaml

Uninstall

Remove eventing-rabbitmq components and resources

Use kubectl delete --filename <installation-file> to remove the components installed during Installation. For example:

kubectl delete --filename https://github.com/knative-extensions/eventing-rabbitmq/releases/download/v0.25.0/rabbitmq-source.yaml

If ko was used to install, can also be used for uninstallation:

ko delete -f config/source/

Uninstall Knative Serving and Eventing

Follow the instructions here to uninstall Knative components.