FsBunny


Namespaces

FsBunny is organized into 3 APIs:

  • EventStreams API - connects to RabbitMQ and provides constructors for publisher and consumers
  • Assembers - modules for packing and unpacking messages in and out of RabbitMQ primitives
  • Publisher and Consumer types - the primary means of interaction

EventStreams API

RabbitMqEventStreams implements the API, you'll need only one instance per process. It takes several arguments:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
#r "FsBunny.dll"
#r "RabbitMQ.Client.dll"
open FsBunny

let streams = 
    new RabbitMqEventStreams(
            RabbitMQ.Client.ConnectionFactory(), // underlying connection factory
            "amq.topic",                         // default exchange
            3us,                                 // number of reconnect retries (publisher only)
            2000us) :> EventStreams              // prefetch limit

The API is thread-safe, every consumer and publisher gets a dedicated channel. The connection is obtained when required and in case of Publisher will attempt to reconnect specified number of time times. The consumer will ensure a connection on every Get request.

Assemblers and Messages

Assemblers grew out of serializers by necessity to inspect/provide metadata in addition to message body. Messages can be anything, and it's up to assembler to figure out how to map it to/from RMQ primitives. For example, MQTT status message has no payload and the topic itself carries the ONLINE/OFFLINE indication.

You may want to provide your own and the implementation is as simple as two functions, here's an example of a Fable-compatible JSON assembler:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
#r "Newtonsoft.Json.dll"
#r "Fable.JsonConverter.dll"

module JsonAssembly =
    open FsBunny
    open System
    open System.IO
    open Newtonsoft.Json

    let serializer = 
        let s = JsonSerializer.CreateDefault()
        s.Converters.Add(Fable.JsonConverter())
        s.Converters.Add(Converters.IsoDateTimeConverter (DateTimeFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.fff'Z'"))
        s

    // disassemble a message into RMQ primitives: target exchange, properties and payload
    let disassembler exchange (item:'T) : (Exchange * IDictionary<string, obj> option * byte []) = 
        use ms = new MemoryStream()
        use sw = new StreamWriter (ms)
        use jw = new JsonTextWriter (sw)
        serializer.Serialize (jw,item)
        jw.Flush()
        exchange, None, ms.ToArray()

    // assemble a message from RMQ primitives
    let assembler (topic:string, properties:IDictionary<string, obj>, payload:byte[]) : 'T = 
        use ms = new MemoryStream(payload)
        use sr = new StreamReader (ms)
        use jr = new JsonTextReader (sr)
        serializer.Deserialize<'T>(jr)

    // and let's expose the functionality as extensions on EventStreams
    type FsBunny.EventStreams with 
       /// Construct a consumer, using specified message type, queue, the exchange to bind to and Proto assember.
        member this.GetJsonConsumer<'T> (queue: Queue) (exchange:Exchange) : Consumer<'T> =
            this.GetConsumer<'T> queue exchange assembler

        /// Construct a publisher for the specified message type using Json disassembler.
        member this.GetJsonPublisher<'T> (exchange: Exchange) : Publisher<'T> = 
            this.GetPublisher<'T> (disassembler exchange)

        /// Use a publisher with a continuation for the specified message type using Json disassembler.
        member this.UsingJsonPublisher<'T> (exchange: Exchange) (cont:Publisher<'T> -> unit) : unit =
            this.UsingPublisher<'T> (disassembler exchange) cont

In case of a failure to assemble a message from RMQ primitives, the message will be Nacked and go into a Dead Letter queue (if one is setup).

Publisher

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
type SomeRecord = { x : int }

open JsonAssembly // import our assembler

let sendSomeRecord() =
    let send = streams.GetJsonPublisher (streams.Default()) // sending to the default exchange
    send { x = 1 } 
    // can keep sending messages

Publisher is just a function that takes a message and returns when completed and there are two ways to obtain it:

  • For long-living use, as in example above,
  • Or for immediate use and disposal:
1: 
2: 
3: 
4: 
5: 
6: 
7: 
let sendSomeRecordDisposeResources() =
    streams.UsingJsonPublisher
        (streams.Default())
        (fun send -> send { x = 1 }) 

    // or, using an equivalent infix operator
    { x = 2 } |-> streams.UsingJsonPublisher (streams.Default())

Consumer

Consumer implicitly creates a queue, subscribes to a topic on the specified Exchange and starts listening for messages. Consumer can bind to a Persistent or Temporary queue (temporary queues will be given Guid names, Ack messages automatically and be deleted once the consumer is garbage-collected). For guaranteed processing you'll use Persistent queue and is expected to Ack/Nack messages explicitly.

1: 
2: 
3: 
4: 
5: 
6: 
7: 
open FSharp.Data.UnitSystems.SI.UnitSymbols

let createConsumers () =
    let temp = streams.GetJsonConsumer<int> 
                    Temporary (Routed("amq.topic", "test.roundtrip"))
    let persistent = streams.GetJsonConsumer<int64> 
                        (Persistent "my_queue") (Routed("amq.topic", "test.roundtrip"))

Once created, we can start polling them, specifying the timeout (interanally the implementation prefetches and doesn't cause a request to the server):

1: 
2: 
3: 
    match temp.Get 10<s> with
    | Some r -> printf "Got an int32: %A" r.msg
    | _ -> ()

The polling API may seem inefficient, but for systems that implement back-pressure this works out quite well.

Using the persistent consumer, once we have processed the message we need to acknowledge or indicate a failure otherwise:

1: 
2: 
3: 
4: 
5: 
    match persistent.Get 10<s> with
    | Some r -> printf "Got an int64: %A" r.msg
                persistent.Ack r.id
                // or persistent.Nack r.id
    | _ -> ()

Consumer API is thread-safe.

namespace System
namespace System.Collections
namespace System.Collections.Generic
val streams : obj

Full name: Tutorial.streams
namespace RabbitMQ
namespace RabbitMQ.Client
Multiple items
type ConnectionFactory =
  new : unit -> ConnectionFactory
  val UserName : string
  val Password : string
  val VirtualHost : string
  val RequestedChannelMax : uint16
  val RequestedFrameMax : uint32
  val RequestedHeartbeat : uint16
  val ClientProperties : IDictionary
  val Ssl : SslOption
  val HostName : string
  ...

Full name: RabbitMQ.Client.ConnectionFactory

--------------------
RabbitMQ.Client.ConnectionFactory() : unit
namespace System.IO
val serializer : obj

Full name: Tutorial.JsonAssembly.serializer
val s : obj
val disassembler : exchange:'a -> item:'T -> 'a * IDictionary<string,obj> option * byte []

Full name: Tutorial.JsonAssembly.disassembler
val exchange : 'a
val item : 'T
type IDictionary<'TKey,'TValue> =
  member Add : key:'TKey * value:'TValue -> unit
  member ContainsKey : key:'TKey -> bool
  member Item : 'TKey -> 'TValue with get, set
  member Keys : ICollection<'TKey>
  member Remove : key:'TKey -> bool
  member TryGetValue : key:'TKey * value:'TValue -> bool
  member Values : ICollection<'TValue>

Full name: System.Collections.Generic.IDictionary<_,_>
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
type obj = Object

Full name: Microsoft.FSharp.Core.obj
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
val ms : MemoryStream
Multiple items
type MemoryStream =
  inherit Stream
  new : unit -> MemoryStream + 6 overloads
  member CanRead : bool
  member CanSeek : bool
  member CanWrite : bool
  member Capacity : int with get, set
  member CopyToAsync : destination:Stream * bufferSize:int * cancellationToken:CancellationToken -> Task
  member Flush : unit -> unit
  member FlushAsync : cancellationToken:CancellationToken -> Task
  member GetBuffer : unit -> byte[]
  member Length : int64
  ...

Full name: System.IO.MemoryStream

--------------------
MemoryStream() : unit
MemoryStream(capacity: int) : unit
MemoryStream(buffer: byte []) : unit
MemoryStream(buffer: byte [], writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool, publiclyVisible: bool) : unit
val sw : StreamWriter
Multiple items
type StreamWriter =
  inherit TextWriter
  new : stream:Stream -> StreamWriter + 7 overloads
  member AutoFlush : bool with get, set
  member BaseStream : Stream
  member Close : unit -> unit
  member Encoding : Encoding
  member Flush : unit -> unit
  member FlushAsync : unit -> Task
  member Write : value:char -> unit + 3 overloads
  member WriteAsync : value:char -> Task + 2 overloads
  member WriteLineAsync : unit -> Task + 3 overloads
  ...

Full name: System.IO.StreamWriter

--------------------
StreamWriter(stream: Stream) : unit
StreamWriter(path: string) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding) : unit
StreamWriter(path: string, append: bool) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding, bufferSize: int) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding, bufferSize: int, leaveOpen: bool) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding, bufferSize: int) : unit
val jw : IDisposable
union case Option.None: Option<'T>
MemoryStream.ToArray() : byte []
val assembler : topic:string * properties:IDictionary<string,obj> * payload:byte [] -> 'T

Full name: Tutorial.JsonAssembly.assembler
val topic : string
val properties : IDictionary<string,obj>
val payload : byte []
val sr : StreamReader
Multiple items
type StreamReader =
  inherit TextReader
  new : stream:Stream -> StreamReader + 10 overloads
  member BaseStream : Stream
  member Close : unit -> unit
  member CurrentEncoding : Encoding
  member DiscardBufferedData : unit -> unit
  member EndOfStream : bool
  member Peek : unit -> int
  member Read : unit -> int + 1 overload
  member ReadAsync : buffer:char[] * index:int * count:int -> Task<int>
  member ReadBlock : buffer:char[] * index:int * count:int -> int
  ...

Full name: System.IO.StreamReader

--------------------
StreamReader(stream: Stream) : unit
   (+0 other overloads)
StreamReader(path: string) : unit
   (+0 other overloads)
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : unit
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding) : unit
   (+0 other overloads)
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : unit
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding) : unit
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : unit
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : unit
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
   (+0 other overloads)
val jr : IDisposable
Multiple items
type Queue<'T> =
  new : unit -> Queue<'T> + 2 overloads
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member CopyTo : array:'T[] * arrayIndex:int -> unit
  member Count : int
  member Dequeue : unit -> 'T
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> Enumerator<'T>
  member Peek : unit -> 'T
  member ToArray : unit -> 'T[]
  ...
  nested type Enumerator

Full name: System.Collections.Generic.Queue<_>

--------------------
Queue() : unit
Queue(capacity: int) : unit
Queue(collection: IEnumerable<'T>) : unit
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
type SomeRecord =
  {x: int;}

Full name: Tutorial.SomeRecord
SomeRecord.x: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
module JsonAssembly

from Tutorial
val sendSomeRecord : unit -> 'a

Full name: Tutorial.sendSomeRecord
val send : (SomeRecord -> 'a)
val sendSomeRecordDisposeResources : unit -> obj

Full name: Tutorial.sendSomeRecordDisposeResources
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Data
namespace Microsoft.FSharp.Data.UnitSystems
namespace Microsoft.FSharp.Data.UnitSystems.SI
namespace Microsoft.FSharp.Data.UnitSystems.SI.UnitSymbols
val createConsumers : unit -> unit

Full name: Tutorial.createConsumers
val temp : obj
val persistent : obj
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
[<Measure>]
type s = Data.UnitSystems.SI.UnitNames.second

Full name: Microsoft.FSharp.Data.UnitSystems.SI.UnitSymbols.s
union case Option.Some: Value: 'T -> Option<'T>
val r : obj
val printf : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printf
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
Fork me on GitHub