Namespaces
FsBunny is organized into 3 APIs:
- EventStreams API - connects to RabbitMQ and provides constructors for publisher and consumers
- Assembers - modules for packing and unpacking messages in and out of RabbitMQ primitives
- Publisher and Consumer types - the primary means of interaction
EventStreams API
RabbitMqEventStreams implements the API, you'll need only one instance per process. It takes several arguments:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
The API is thread-safe, every consumer and publisher gets a dedicated channel.
The connection is obtained when required and in case of Publisher will attempt to reconnect specified number of time times.
The consumer will ensure a connection on every Get
request.
Assemblers and Messages
Assemblers grew out of serializers by necessity to inspect/provide metadata in addition to message body. Messages can be anything, and it's up to assembler to figure out how to map it to/from RMQ primitives. For example, MQTT status message has no payload and the topic itself carries the ONLINE/OFFLINE indication.
You may want to provide your own and the implementation is as simple as two functions, here's an example of a Fable-compatible JSON assembler:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: |
|
In case of a failure to assemble a message from RMQ primitives, the message will be Nacked and go into a Dead Letter queue (if one is setup).
Publisher
1: 2: 3: 4: 5: 6: 7: 8: |
|
Publisher is just a function that takes a message and returns when completed and there are two ways to obtain it:
- For long-living use, as in example above,
- Or for immediate use and disposal:
1: 2: 3: 4: 5: 6: 7: |
|
Consumer
Consumer implicitly creates a queue, subscribes to a topic on the specified Exchange
and starts listening for messages.
Consumer can bind to a Persistent
or Temporary
queue (temporary queues will be given Guid names, Ack messages automatically and be deleted once the consumer is garbage-collected).
For guaranteed processing you'll use Persistent queue and is expected to Ack/Nack messages explicitly.
1: 2: 3: 4: 5: 6: 7: |
|
Once created, we can start polling them, specifying the timeout (interanally the implementation prefetches and doesn't cause a request to the server):
1: 2: 3: |
|
The polling API may seem inefficient, but for systems that implement back-pressure this works out quite well.
Using the persistent consumer, once we have processed the message we need to acknowledge or indicate a failure otherwise:
1: 2: 3: 4: 5: |
|
Consumer API is thread-safe.
Full name: Tutorial.streams
type ConnectionFactory =
new : unit -> ConnectionFactory
val UserName : string
val Password : string
val VirtualHost : string
val RequestedChannelMax : uint16
val RequestedFrameMax : uint32
val RequestedHeartbeat : uint16
val ClientProperties : IDictionary
val Ssl : SslOption
val HostName : string
...
Full name: RabbitMQ.Client.ConnectionFactory
--------------------
RabbitMQ.Client.ConnectionFactory() : unit
Full name: Tutorial.JsonAssembly.serializer
Full name: Tutorial.JsonAssembly.disassembler
member Add : key:'TKey * value:'TValue -> unit
member ContainsKey : key:'TKey -> bool
member Item : 'TKey -> 'TValue with get, set
member Keys : ICollection<'TKey>
member Remove : key:'TKey -> bool
member TryGetValue : key:'TKey * value:'TValue -> bool
member Values : ICollection<'TValue>
Full name: System.Collections.Generic.IDictionary<_,_>
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Full name: Microsoft.FSharp.Core.obj
Full name: Microsoft.FSharp.Core.option<_>
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = Byte
Full name: Microsoft.FSharp.Core.byte
type MemoryStream =
inherit Stream
new : unit -> MemoryStream + 6 overloads
member CanRead : bool
member CanSeek : bool
member CanWrite : bool
member Capacity : int with get, set
member CopyToAsync : destination:Stream * bufferSize:int * cancellationToken:CancellationToken -> Task
member Flush : unit -> unit
member FlushAsync : cancellationToken:CancellationToken -> Task
member GetBuffer : unit -> byte[]
member Length : int64
...
Full name: System.IO.MemoryStream
--------------------
MemoryStream() : unit
MemoryStream(capacity: int) : unit
MemoryStream(buffer: byte []) : unit
MemoryStream(buffer: byte [], writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool, publiclyVisible: bool) : unit
type StreamWriter =
inherit TextWriter
new : stream:Stream -> StreamWriter + 7 overloads
member AutoFlush : bool with get, set
member BaseStream : Stream
member Close : unit -> unit
member Encoding : Encoding
member Flush : unit -> unit
member FlushAsync : unit -> Task
member Write : value:char -> unit + 3 overloads
member WriteAsync : value:char -> Task + 2 overloads
member WriteLineAsync : unit -> Task + 3 overloads
...
Full name: System.IO.StreamWriter
--------------------
StreamWriter(stream: Stream) : unit
StreamWriter(path: string) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding) : unit
StreamWriter(path: string, append: bool) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding, bufferSize: int) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding, bufferSize: int, leaveOpen: bool) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding, bufferSize: int) : unit
Full name: Tutorial.JsonAssembly.assembler
type StreamReader =
inherit TextReader
new : stream:Stream -> StreamReader + 10 overloads
member BaseStream : Stream
member Close : unit -> unit
member CurrentEncoding : Encoding
member DiscardBufferedData : unit -> unit
member EndOfStream : bool
member Peek : unit -> int
member Read : unit -> int + 1 overload
member ReadAsync : buffer:char[] * index:int * count:int -> Task<int>
member ReadBlock : buffer:char[] * index:int * count:int -> int
...
Full name: System.IO.StreamReader
--------------------
StreamReader(stream: Stream) : unit
(+0 other overloads)
StreamReader(path: string) : unit
(+0 other overloads)
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : unit
(+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding) : unit
(+0 other overloads)
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : unit
(+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding) : unit
(+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : unit
(+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : unit
(+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
(+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
(+0 other overloads)
type Queue<'T> =
new : unit -> Queue<'T> + 2 overloads
member Clear : unit -> unit
member Contains : item:'T -> bool
member CopyTo : array:'T[] * arrayIndex:int -> unit
member Count : int
member Dequeue : unit -> 'T
member Enqueue : item:'T -> unit
member GetEnumerator : unit -> Enumerator<'T>
member Peek : unit -> 'T
member ToArray : unit -> 'T[]
...
nested type Enumerator
Full name: System.Collections.Generic.Queue<_>
--------------------
Queue() : unit
Queue(capacity: int) : unit
Queue(collection: IEnumerable<'T>) : unit
Full name: Microsoft.FSharp.Core.unit
{x: int;}
Full name: Tutorial.SomeRecord
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
from Tutorial
Full name: Tutorial.sendSomeRecord
Full name: Tutorial.sendSomeRecordDisposeResources
Full name: Tutorial.createConsumers
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
type s = Data.UnitSystems.SI.UnitNames.second
Full name: Microsoft.FSharp.Data.UnitSystems.SI.UnitSymbols.s
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printf
Full name: Microsoft.FSharp.Core.Operators.id