Dynamic load balancing in F#
In this short entry I'll present how one can create a small load balancer in F# to optimize distribution of a set of computations between multiple nodes. The example will make use of powerful F# libraries: FSharp.Control.AsyncSeq as well as FSharpx.Async.
Use case
Today I faced an interesting problem at work - for the purpose of regression testing we run several dozens of web service calls, each of which takes a noticable amount of time (even up to 120 seconds). These calls target a third-party software which renders a document in PDF format. We have set up a farm of servers to distribute the requests and speed up the regression testing. The software can queue requests, however there's a limited number of requests that can run in parallel on a single server (usually 4). Because of that, I decided to come up with a solution in F# which tries to distribute the requests as optimal as possible.
Idea
Initially I tried to split the input set into equal chunks, each for separate server. This worked quite well, but after a while it turned out that requests which targetted one server finished much earlier than others, and the fastest server was idle for the rest of time. Because of that I started looking for another solution. Browsing the web, I came across this article by Tomas Petricek, where the author described how to work with asynchronous sequences in F#. Provided samples and references led me to think of a different idea.
I wanted to implement an algorithm, where each of the servers would initally take the maximum number of requests (4 in my case). Then whenever a response was back from one of the servers, this server would acquire next request from the queue. The pattern would continue iteratively until no more requests were found in the queue. I pre-ordered the queue descending by input size, so that the potentially longest-running computations could go in first turn.
Code
The solution depends on a few libraries, which we can install with Paket:
1: 2: 3: 4: |
|
And following is a snippet implementing the load balancer:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: |
|
We can imagine callService
to be a function that makes the actual http call to a web service.
The process
function takes two parametrs: machine (server) name, and input.
It constructs a proper requests and fires it towards destination server.
Most important part comes in renditions
function:
- the function takes a list of machines and inputs as its parameters
workersCount
stands for the maximum amount of computations performed on a single serverrenditionsCount
is just the number of inputs to processasyncSeq
starts an Async Sequence computation expression-
inside the computation expression, there are two queues: one for computations and the other for actual results of those computations
- the queues are of type
BlockingQueueAgent
- this type from FSharpx.Async library implements queue as an agent in blocking fashion - they allow us to keep track of requests yet to be processed, as well as already processed results
- the queues are of type
-
recursive
worker
function (inspired by this excerpt):- is parametrized by
machine
argument - extracts a pending request from
queue
- processes the result
- adds the result to
result
queue - recursively invokes itself to process another request
- all of above actions are performed asynchronously
- is parametrized by
-
the
asyncSeq
computation expression ends with following three iterations:- each input is added to the
queue
- specified amount of workers are fired (based on
workersCount
) for each machine - the results are
yielded
from theresults
queue - we expect preciselyrenditionsCount
outputs
- each input is added to the
- return value of the computation expression is finally "piped" (
|>
) toAsync
combinators, so that the return type ofrenditions
function is just an array of results (not wrapped insideAsync
)
Above code managed to meet my expectations - load balance was performed dynamically, based on server capacity at a specific point in time. Unfortunately I haven't made any benchmarks to measure the speedup, but at least I observed that those faster servers were not idle anymore, and didn't have to wait for others to complete their computations.
Links
For further reading, I'd like to mention again a couple of resources I used when implementing the solution:
- FSharp.Control.AsyncSeq
- FSharpx.Async
- Tomas Petricek's blog post on programming with asynchronous sequences
- Example of web crawler which also make use of
BlockingQueueAgent
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = System.String
Full name: Microsoft.FSharp.Core.string
Full name: Microsoft.FSharp.Collections.list<_>