FsBunny


Streaming API for RabbitMQ

FsBunny implements a streaming API on top of official RabbitMQ client for implementation of event-driven systems.

The core idea is that while there are many streams carrying many messages using many serializers, each stream is dedicated to a single type of message serialized using certain serializer.

It works under assumptions that:

  • we never want to loose a message
  • the exchange + topic tells us the message type (as well as serialization format)
  • the consumer is long-lived and handles only one type of message
  • the consumer decides when to pull the next message of a queue
  • the publishers can be long- or short- lived and address any topic
  • we have multiple serialization formats and may want to add new ones easily

Installing

The FsBunny library can be installed from NuGet:
PM> Install-Package FsBunny

Example

This example demonstrates the API defined by FsBunny:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
#r "FsBunny.dll"
#r "RabbitMQ.Client.dll"
open FsBunny

let ``RabbitMQ raw event roundtrips``() = 
    use streams = 
           RabbitMqEventStreams(
              RabbitMQ.Client.ConnectionFactory(), // underlying connection factory
              "amq.topic",                         // default exchange
              3us,                                 // number of reconnect retries (publisher only)
              2000us) :> EventStreams              // prefetch limit
    
    use consumer = streams.GetConsumer<int> 
                    Temporary 
                    (Routed("amq.topic", "test.*.sample")) // Routed over "amq.topic" on key "test.*.sample"
                    (fun (topic,headers,payload) -> int(topic.Split('.').[1])) // assemble the message from RMQ primitives
    let publish = streams.GetPublisher<int> 
                    (fun x -> Routed("amq.topic", sprintf "test.%d.sample" x),None,[||]) // see the tutorial for details
    
    publish 1

    match consumer.Get 10<s> with
    | Some r -> printf "Got 1: %A" (r.msg = 1)
    | _ -> failwith "should have gotten the message we just sent"

Upgrading from v1.x

  • EventStreams and Consumer now implement IDisposable, so you can release them whenever you don't need them anymore.
  • The consumer implementation will no longer try and interleave the Get and Ack/Nack, fully blocking the gets instead.

Samples & documentation

  • Tutorial goes into more details.

  • API Reference contains automatically generated documentation for all types, modules and functions in the library. This includes additional brief samples on using most of the functions.

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests.

The library is available under Apache license, which allows modification and redistribution for both commercial and non-commercial purposes. For more information see the License file in the GitHub repository.

Copyright 2017 et1975 Technologies Inc

val ( RabbitMQ raw event roundtrips ) : unit -> unit

Full name: Index.( RabbitMQ raw event roundtrips )
val streams : System.IDisposable
namespace RabbitMQ
namespace RabbitMQ.Client
Multiple items
type ConnectionFactory =
  new : unit -> ConnectionFactory
  val UserName : string
  val Password : string
  val VirtualHost : string
  val RequestedChannelMax : uint16
  val RequestedFrameMax : uint32
  val RequestedHeartbeat : uint16
  val ClientProperties : IDictionary
  val Ssl : SslOption
  val HostName : string
  ...

Full name: RabbitMQ.Client.ConnectionFactory

--------------------
RabbitMQ.Client.ConnectionFactory() : unit
val consumer : System.IDisposable
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val publish : (int -> unit)
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val r : obj
val printf : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printf
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
Fork me on GitHub