We explain the code for Ref and LockingTransaction.

For background, refer to the previous post, STM in Clojure - Design.

The Ref class

I’m going to leave out some details of the implementation that are not relevant to STM directly – validators, watchers, and agents.

There are two primary classes to deliver: Ref and LockingTransaction. In F#, these will be mutally recursive classes. Ref is by far the simpler of the two. It holds the history list of values. It forwards certain operations to the transaction currently running (on its thread), and provides some basic operations to support LockingTransaction.

The history list is implemented using a doubly-linked circular list.
The list is never empty; it always starts with an entry for the initial value given when the Ref is created. The initial value will be associated with timestamp 0. The ‘root’ of the list, the node that the Ref holds diretly, will always have the most recent value.

All accesses to this list must be protected by locking. The requirements mentioned in the previous post match up with the semantics of a System.Threading.ReaderWriterLockSlim. It is incumbent on the LockingTransaction to acquire the lock in read mode when it is reading the value of a Ref and in write mode when it is updating the value of a Ref. We will provide utility methods to make this easy.

Let’s get started. We define the class RefVal to be a node in the double-linked circular list for holding values. It must hold a value, a timestamp, and points to the next and previous nodes in the list. The default constructor will create a node representing a list of one element, i.e., the next and previous pointers will point to the node itself.

In the first code I wrote for this, I had the primary constructor create a node that represented a list of one element. A singleton list is represented by have the pointers to the next and previous nodes point to the node itself. This can be done using a ‘recursive object reference’, as shown here:

type RefVal(v: obj, pt: int64) as this
    // these initializations are sufficient to create a circular list of one element
    let mutable value : obj = v
    let mutable point : int64 = pt
    let mutable prior : RefVal = this
    let mutable next : RefVal = this

This will do the job. However, the code generated for the class has runtime initialization checks all over the place. As in: every method. There are several ways around this. One involves tagging the class with AllowNullLiteral and set those fields to null. The other approach is to go unsafe and initialize with Unchecked.defaultof<_>. In either case, we need to make the constructor private and create public factory methods to yield properly initialized objects. I went with the second approach; thus

[<Sealed>]
type internal RefVal private (v: obj, pt: int64) = 

    let mutable value : obj = v
    let mutable point : int64 = pt

    // these implement a doubly-linked circular list
    // the default constructor creates a self-linked node
    let mutable prior : RefVal = Unchecked.defaultof<RefVal>
    let mutable next : RefVal =  Unchecked.defaultof<RefVal>

    // create a list of one element
    static member  createSingleton(v, pt) = 
        let r = RefVal(v, pt)
        r.Prior <- r
        r.Next <- r
        r

    // Create a new RefVal and insert it after the given RefVal.
    member this.insertAfter(v, pt) = 
        let r = RefVal(v, pt)        
        r.Prior <- this
        r.Next <- this.Next
        this.Next <- r
        r.Next.Prior <- r
        r

insertAfter does some standard surgery to insert a new node into the list after the current node. It returns the new node. The rest of the code for RefVal is straightforward.

    // Ref will need some accessors to do iterations
    member _.Value with get() = value
    member _.Point with get() = point
    member _.Prior with get() = prior
    member _.Next with get() = next

    // We will need to update the value in the root node
    member _.SetValue(v: obj, pt: int64) = 
        value <- v
        point <- pt

    // There is one special operation to reset the root node so that the list has only this one element.
    member this.Trim() = 
        prior <- this
        next <- this

Now we can proceed with the Ref class.

[<AllowNullLiteral>]
type Ref(initVal: obj)

    // the root node of the history list
    // holds the most recent value.
    // Initialized to the initial value supplied for the Ref and timestamp 0
    let mutable rvals = RefVal(initVal, 0L)

    // We need a lock to protect access
    let lock = new System.Threading.ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion)

    // We need a field to hold the 'stamp' a transaction puts on the Ref. 
    // The details do not matter to us here.  There may not be a stamp, so we use an option type.
    let mutable txInfo : LTInfo option = None

    // We need this history size bounds.    
    [<VolatileField>]
    let mutable minHistory = 0 

    [<VolatileField>]
    let mutable maxHistory = 10

    // This little goodie we'll need later. 
    // It counts the number of read faults that have occurred, used to decide when we need to grow the history list.
    let faults = AtomicInteger()

We’re going to need some basic accessors for LockingTransaction and clojure.core to use.

    member _.TxInfo
        with get () = txinfo
        and set(v) = txinfo <- v

    member _.MinHistory with get() = minHistory
    member _.MaxHistory with get() = maxHistory
    member this.SetMinHistory(v) = minHistory <- v; this
    member this.SetMaxHistory(v) = maxHistory <- v; this

Some little helpers for dealing with locking:

    member _.enterReadLock() = lock.EnterReadLock()
    member _.exitReadLock() = lock.ExitReadLock()
    member _.enterWriteLock() = lock.EnterWriteLock()
    member _.exitWriteLock() = lock.ExitWriteLock()
    member _.tryEnterWriteLock(msecTimeout : int) = lock.TryEnterWriteLock(msecTimeout)

We get to see a little locking action here. getHistoryCount is needed to implement get-history-count in Clojure itself. We need to acquire at least a read lock anytime we access rvals.

    // Count the entries in the values list
    // the caller should lock
    member private _.histCount() =
        let mutable count = 0
        let mutable rv = rvals.Next
        while LanguagePrimitives.PhysicalEquality rv rvals do
            count <- count + 1
            rv <- rv.Next
        count

    member this.getHistoryCount() =
        try
            this.enterWriteLock()
            this.histCount()
        finally
            this.exitWriteLock()

    // Get rid of the history, keeping just the current value
    member _.trimHistory() =
        try
            this.enterWriteLock()
            rvals.Trim()
        finally
            this.exitWriteLock()

A few more convenience methods.

    // These need to be called with a lock acquired.
    member _.currentPoint() = rvals.Point
    member _.currentVal() = rvals.Value

    // Add to the fault count.
    member _.addFault() = faults.incrementAndGet() |> ignore

We have a few more public methods to implement to complete the interface. Ref needs to implement interface IDeref. Here is where we get our first interaction with the transaction. As mentioned in the ‘design’ post, what deref looks at depends on whether it is called in a transaction scope or not. LockingTransaction.getRunning() will return None if we are not in a transaction.
Otherwise, it will return the transaction object.
If we are running in a transaction scope, we have to ask the transaction to supply the value because there might be an in-transaction value for this Ref. If not, we just access the current value.

    interface IDeref with
        member this.deref() =
            match LockingTransaction.getRunning() with
            | None -> 
                try 
                    this.enterReadLock()
                    rvals.Value
                finally
                    this.exitReadLock()
            | Some t -> t.doGet(this)

The Clojure functions ref-set, alter, and commute are all implemented in LockingTransaction. LockingTransaction.getEx() will return the transaction object if we are in a transaction scope and throw an exception if we are not.

    // Set the value (must be in a transaction).
    member this.set(v: obj) = LockingTransaction.getEx().doSet(this, v)

    // Apply a commute to the reference. (Must be in a transaction.)
    member this.commute(fn: IFn, args: ISeq) = LockingTransaction.getEx().doCommute(this, fn, args)

    // Change to a computed value.
    member this.alter(fn: IFn, args: ISeq) = 
        let t = LockingTransaction.getEx()
        t.doSet(this, fn.applyTo(RTSeq.cons(t.doGet(this), args)))

    // Touch the reference.  (Add to the tracking list in the current transaction.)
    member this.touch() = LockingTransaction.getEx().doEnsure(this)

Our final operation in Ref is used by LockingTransaction to update the value of the Ref in the Ref world. Here is where history comes into play. We will expand the history list in two situations:
(a) the size of the list is less than the minimum history size; and (b) we had a fault in getting the value of this Ref and we are less than the maximum history size. The fault count is incremented in the LockingTransaction code; we will reset the fault count to zero when we increase the list. These two conditions are tested for in the if clause below. If either condition is satisfied, we add a new RefVal node to the list. If neither condition is met we replace the oldest value in the list with the new value and make it the new root node. (The statement rvals <- rvals.Next is doing the heavy lifting here; it rotate the oldet entry to be the root and changes its value. Draw pictures.)

    // Set the value
    member this.setValue(v: obj, commitPoint: int64) =
        let hcount = this.histCount()
        if (faults.get() > 0 && hcount < maxHistory) || hcount < minHistory then
            rvals <- rvals.insertAfter(v, commitPoint)
            faults.set(0) |> ignore
        else
            rvals <- rvals.Next
            rvals.SetValue(v, commitPoint)

The LockingTransaction class

The LockingTransaction (LT) class is more complex. It is the class that manages the transactional state and the transactional operations. An LT has a state:

type LTState = 
    | Running = 1L
    | Committing = 2L
    | Retry = 3L
    | Killed = 4L
    | Committed = 5L
    | Stopped = 6L

The state is held in an LTInfo object:

type LTInfo(initState: LTState, startPoint: int64) = 

    // the state of the transaction
    let mutable status : int64 = initState |> int64

    // A latch for ... to be explained later.
    let latch = CountdownLatch(1)

    // Accessors
    member _.Latch = latch
    member _.StartPoint = startPoint

    member _.State
        with get() = Microsoft.FSharp.Core.LanguagePrimitives.EnumOfValue<int64, LTState>(status)
        and set(v:LTState) = status <- int64 v
    
    member this.compareAndSet(oldVal: LTState, newVal: LTState) =
        let origVal = Interlocked.CompareExchange(&status, newVal |> int64, oldVal |> int64)
        origVal = (oldVal |> int64)

    member this.set(newVal: LTState) =
        Interlocked.Exchange(&status, newVal |> int64)

    member this.isRunning =
        let s = Microsoft.FSharp.Core.LanguagePrimitives.EnumOfValue<int64, LTState>(Interlocked.Read(&status)) 
        s = LTState.Running || s = LTState.Committing

The state is an LTState value, but stored as an int64.
We are careful to make changes to the status field using Interlocked methods bacause other threads may mutate this field. Unfortunately, the Interlocked methods won’t work on LTState values, even though they are represented as int64 values. I decided to stick with using the LTState enumeration rather than integer constants, trading a little more complexity here for clarity in the rest of the program.
The EnumOfValue calls take care of translating from an int64 back to an LTState.

Now we can define the LockingTransaction class.

[<Sealed>]
type LockingTransaction private () =
   ...

Let’s get some bookkeeping out of the way. We will need to generate timestamps.
A new timestamp is needed each time an LT attempts to execute a transaction, including each retry. We also generate a new timestamp for each commit point. The timestamp is a 64-bit integer, we use an AtomicLong to generate them as multiple threads will be accessing the static field holding the most recent value. The transaction will hold a read point, the timestamp assigned as it begins execution (including for each retry). We keep track of its first-assigned read point, the start point, as a marker of its age relative to other transactions.

    // The current point.
    // Used to provide a total ordering on transactions for the purpose of determining preference on transactions when there are conflicts.
    // Transactions consume a point for init, for each retry, and on commit if writing.
    static member private lastPoint : AtomicLong  = AtomicLong()

    // The point at the start of the current retry (or first try).
    let mutable readPoint : int64 = 0L

    // The point at the start of the transaction.
    let mutable startPoint : int64 = 0L    

    // Get/set a new read point value.
    member this.getReadPoint() =
        readPoint <- LockingTransaction.lastPoint.incrementAndGet()

    // Get a commit point value.
    static member getCommitPoint() =
        LockingTransaction.lastPoint.incrementAndGet()

For age computations, we also keep track of the System.Ticks value at the start of the transaction.

    // The system ticks at the start of the transaction.
    let mutable startTime : int64 = 0L

We keep track of the transaction’s state in an LTInfo object. This will be changed on every retry, hence the mutable.

    // The state of the transaction.
    // This is an option so we can zero it out when the transaction is done.
    let mutable info : LTInfo option = None
    
    member _.Info = info

We need to detect if a transaction is running on the current thread and access it. We use thread-local storage for this. Note that the type is LockingTransaction option.

    // The transaction running on the current thread.  (Thread-local.)
    [<DefaultValue;ThreadStatic>]
    static val mutable private currentTransaction : LockingTransaction option

We provide several accessors. getRunning returns a LockingTransaction option. getEx returns a LockingTransaction or throws an exception if there is no transaction running. And we have a simple boolean check of existence.

    // Get the transaction running on this thread (throw exception if no transaction). 
    static member getEx() =
        let transOpt = LockingTransaction.currentTransaction
        match transOpt with
        | None -> 
            raise <| InvalidOperationException("No transaction running")
        | Some t ->
            match t.Info with
            | None -> 
                raise <| InvalidOperationException("No transaction running")
            | Some info -> t

    // Get the transaction running on this thread (or None if no transaction).
    static member getRunning() =
        let transOpt = LockingTransaction.currentTransaction
        match transOpt with
        | None -> None
        | Some t ->
            match t.Info with
            | None -> None
            | Some info -> transOpt

    // Is there a transaction running on this thread?
    static member isRunning() = LockingTransaction.getRunning().IsSome

Note the special case of seeing an LT with no LTInfo.
This is a transaction that has been stopped and is working on cleanup. We haven’t gotten rid of it yet but we are in the process of doing so; it is not running.

The books that need keeping

In the previous post, we mentioned the collections that LockingTransaction needs to maintain. Here they are:

    // Ref assignments made in this transaction (both sets and commutes).
    let vals  = Dictionary<Ref,obj>()

    // Refs that have been set in this transaction.
    let sets = HashSet<Ref>()

    // Ref commutes that have been made in this transaction.
    let commutes = SortedDictionary<Ref, ResizeArray<CFn>>()

    // The set of Refs holding read locks.
    let ensures = HashSet<Ref>()

And not having a better place to put it, we need an exception to signal that we need to retry:

 // Cached retry exception.
    let retryEx = RetryEx("")

where we have already defined

exception RetryEx of string

Running a transaction

The Clojure-level macros dosync and sync both expand to call LockingTransaction.runInTransaction. This method creates a new LockingTransaction object, sets it as the current transaction, runs the supplied code, and then tries to commit. However, if a transaction is already running on the current thread, we will join the existing transaction; this happens if the body of an outer dosync calls dosync again, either directly or indirectly.

    static member runInTransaction(fn:IFn) : obj =
        let transOpt = LockingTransaction.currentTransaction
        match transOpt with
        | None ->
            // no transaction running.  Start one, put it on the thread, and run the code.
            let newTrans = LockingTransaction()
            LockingTransaction.currentTransaction <- Some newTrans
            try
                newTrans.run(fn)
            finally
                LockingTransaction.currentTransaction <- None
        | Some t ->
            match t.Info with 
            // I'm not sure when this case would happen.  We have a transaction object that is effectively uninitialized.
            // We use 'run' to take care of intializing it.
            | None -> t.run(fn)
            //  In this case, the transaction is properly running.  We can just invoke the body.
            | _ ->  fn.invoke()

Thus runInTransaction takes care of seeing if we are already in a transaction and creating / registering one if not. The real work is in run:

    member this.run(fn:IFn) : obj =
        let mutable finished = false
        let mutable ret = null
        let locked = ResizeArray<Ref>()
             
        let mutable i = 0

        while not finished && i < RetryLimit do 
            try
                try
                    this.getReadPoint()

                    if i = 0 then
                        startPoint <- readPoint
                        startTime <- int64 Environment.TickCount

                    let newLTInfo = LTInfo(LTState.Running, startPoint)

                    info <- Some <| newLTInfo
                    ret <- fn.invoke()

                    // CODE TO COMMIT COMES HERE

                with
                    | :? RetryEx -> ()
                    | ex when  (LockingTransaction.containsNestedRetryEx(ex)) -> ()
                    | _ -> reraise()
            finally

               // CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not                    
                
            i <- i + 1

        if not finished then
            raise <| InvalidOperationException("Transaction failed after reaching retry limit")

        ret

The main body of run is a loop that will keep (re)trying calling the body (fn.invoke()) until the commit code succeeds (that will set the finished flag to true) or we iterate the maximum number of times.
If during this activity, either the invocation or the commit, a RetryEx is thrown, we catch it and continue the loop. If we exit the loop and finished is still false, we throw an exception: we have failed to commit the transaction within the allotted number of attempts.

BTW, that’s a lot of failures:

    // The number of times to retry a transaction in case of a conflict.
    [<Literal>]
    let RetryLimit = 10000

As noted earlier, we do create a new LTInfo object for each attempt. Before looking at the commit code that was elided above, let’s look at what the function invocation might be doing.

Running the body

We earlier showed methods in Ref that forward action to the LT that’s running on this thread:

    member this.set(v: obj) = LockingTransaction.getEx().doSet(this, v)
    member this.commute(fn: IFn, args: ISeq) = LockingTransaction.getEx().doCommute(this, fn, args)
    member this.alter(fn: IFn, args: ISeq) = 
        let t = LockingTransaction.getEx()
        t.doSet(this, fn.applyTo(RTSeq.cons(t.doGet(this), args)))
    member this.touch() = LockingTransaction.getEx().doEnsure(this)

Also, Ref’s implemention of IDeref.deref calls doGet. That’s the easiest one, so let’s start with it.

        member this.doGet(r: Ref) : obj =
            if info.Value.isRunning then    
                if vals.ContainsKey(r) then
                    vals.[r]
                else
                    let valOpt = 
                        try
                            r.enterReadLock()

                            let rec loop (ver:RefVal) : obj option =
                                if ver.Point <= readPoint then
                                    Some ver.Value
                                elif Object.ReferenceEquals(ver.Prior, r.getRVals()) then
                                    None
                                else
                                    loop ver.Prior

                            loop (r.getRVals())

                        finally
                            r.exitReadLock()

                    match valOpt with
                    |None ->
                        // no version of val precedes the read point
                        r.addFault()
                        raise retryEx
                    |Some v -> v
            else 
                raise retryEx

First, we check to see if this transaction is still running. It was running when getEx was called, but some other thread might have sneaked in and barged us in between then and now. Checking here corresponds the our earlier promise to always check our status. If our transaction is no longer running, we throw a RetryEx.

If we have done a ref-set or alter in this transaction, we will have an in-transaction value stored in vals, so we look for that. Otherwise, we need to go to the Ref r and see what value it has. We do not necessarily need the latest value, just any value with a timestamp (ver.Point) less than the timestamp we are working under (readPoint). The function loop is a simple iteration through the doubly-linked list. It returns an obj option so we can signal ‘no version in the history precedes the read point’. In that case, we mark on the Ref that a fault has occurred and throw for a retry. If we do find a value, we return it.

Okay. That’s the easy one. doCommute is not much harder.

        member this.doCommute(r:Ref, fn:IFn, args:ISeq) : obj =
            if not info.Value.isRunning then
                raise retryEx
            
            if not (vals.ContainsKey(r)) then
                let v = 
                    try
                        r.enterReadLock()
                        r.currentVal()
                    finally
                        r.exitReadLock()
                vals[r] <- v

            let mutable fns : ResizeArray<CFn> = null
            if not (commutes.TryGetValue(r, &fns)) then
                fns <- ResizeArray<CFn>()
                commutes[r] <- fns
            fns.Add({ fn = fn; args = args })
            let v = fn.applyTo(RTSeq.cons(vals[r], args))
            vals[r] <- v
            v

Check to make sure our transaction is still running. Check to see if we have an in-transaction value set. If not, grab the current value of the Ref and store it as the in-transaction value (for just a moment). Create an entry for this Ref in commutes with an empty list as the associated value if no entry exists. And then run commutes action using the present value of the Ref and store that result as the in-transaction value. The only locking required is a brief hold of a read lock on the Ref to grab the current value.

doEnsure is the first place we look at checking for the ‘stamp’ on a Ref:

        // Touch a ref.  (Lock it.)
        member this.doEnsure(r: Ref) =
            if info.Value.isRunning then                 
                if ensures.Contains(r) then
                    ()
                else
                    r.enterReadLock()

                    // someone completed a write after our snapshot
                    if r.currentPoint() > readPoint then
                        r.exitReadLock()
                        raise retryEx
                    
                    match r.TxInfo with
                    | Some refinfo when refinfo.isRunning ->
                        r.exitReadLock()
                        if not <| Object.ReferenceEquals(refinfo,info) then
                            this.blockAndBail(refinfo)
                    | _ -> ensures.Add(r) |> ignore

            else 
                raise retryEx

Check to make sure our transaction is still running. If the Ref is already ensured, this is a no-op. Otherwise, we get a read lock to check the current value of the Ref. First, if someone has done a real commit after we started, we are going to retry – and exit the read lock before throwing. Second, we look at r.TxInfo to see if someone has put a stamp on it. If so, we release the read lock. Now the tricky part I mentioned in the previous post. If someone else has the stamp, we are screwed, so we block-and-bail. If not, then the stamp is our stamp – we just get out – this is a no-op. If there no stamp or the stamper is not running, then we add the Ref to our ensures collection. Phew.

And now the monster:

    // Set the value of a ref inside the transaction.
    member this.doSet(r: Ref, v: obj) =
        if info.Value.isRunning then 
            if commutes.ContainsKey(r) then
                raise <| InvalidOperationException("Can't set after commute")
            if not (sets.Contains(r)) then
                sets.Add(r) |> ignore
                this.lock(r) |> ignore
            vals.[r] <- v
            v
        else
            raise retryEx

Check to make sure our transaction is still running. If the Ref has had a commute call on it already, this is an illegal operation. If the Ref is not already in our sets collection, then we add it to the collection and call lock. And record the in-transaction value for the Ref in the vals map.

That wasn’t so bad.

The monster is hiding in the lock method.

   member private this.lock(r : Ref) =

        // can't upgrade read lock, so release it.
        this.releaseIfEnsured(r)

        let mutable unlocked = true
        try
            this.tryWriteLock(r)
            unlocked <- false

            if r.currentPoint() > readPoint then
                raise retryEx

            let success() = 
                r.TxInfo <- info
                r.currentVal()

            match r.TxInfo with
            | Some (refinfo:LTInfo) when refinfo.isRunning && not <| obj.ReferenceEquals(refinfo, info) ->
                if this.barge(refinfo) then
                    success()
                else
                    r.exitWriteLock()
                    unlocked <- true
                    this.blockAndBail(refinfo)
            | _ -> success()
        finally
            if not unlocked then
                r.exitWriteLock()

The first is to call releaseIfEnsured. This checks if the Ref is in the ensures collection. If it is, we remove it from the collection and release its read lock. This last piece is critical. Membership in ensures if-and-only-if has a read lock.

   member private this.releaseIfEnsured(r: Ref) =
        if ensures.Contains(r) then
            ensures.Remove(r) |> ignore
            r.exitReadLock()

The next thing is try to get a write lock. If that times out, an exception is thrown. The unlocked flag controls whether we need to release the write lock on our way of the method.

Now we see some checks that are starting maybe are starting to look familiar. If the Ref’s value in the real world has been set subsequent to the start of our attempt, retry. Next, check the stamp on the Ref. If there is a stamp and the stamper is running and it is not us, we have some work to do. (Next paragraph.) Otherwise, put our stamp on it and return the current value. (Why return the current value? It just gets ignore. I don’t know.)

If there is running stamper that is not us, well, one of us has got to go. We try to barge the other guy. If we do, then all is good – declare succees. If we cannot barge the other guy, than we have to kill our attempt and retry – block-and-bail.

One little subtlety here. Not that if don’t barge, we release the write lock (and set the indicator flag) before call block-and-bail, even though the finally clause would release the write lock for us otherwise. This gives other threads a chance to get a lock (read or write) before we finish the block-and-bail. We’re just being polite.

Barging is relatively straightforward.

    member private this.barge(refinfo: LTInfo) =
        let mutable barged = false

        // if this transaction is older, try to abort the other
        if this.bargeTimeElapsed && startPoint < refinfo.StartPoint then
            barged <- refinfo.compareAndSet(LTState.Running, LTState.Killed)
            if barged then
                refinfo.Latch.CountDown()
        barged

First, we don’t try to barge unless we have been around for a while. A little patience is called for.

    // How old our transaction must be before we 'barge' it.
    // Java version has BARGE_WAIT_NANOS, set at 10*1_000_000.
    // If I'm thinking correctly tonight, that's 10 milliseconds.
    // Ticks here are 100 nanos, so we should have  10 * 1_000_000/100 = 100_000.
    [<Literal>]
    let BargeWaitTicks = 100_000L

    // Determine if sufficient clock time has elapsed to barge another transaction.
    member private _.bargeTimeElapsed = (int64 Environment.TickCount) - startTime > BargeWaitTicks

Second, we only barge the other transaction if we are older. This compares start points instead of read points, so this dates back to the first attempts of each transaction, not the current retry of each. (To determine who is older, look at dates of birth, not who had the most recent birthday.) The compareAndSet uses an Interlocked method. Why might this fail? The other transaction was running just before we got here. But from then now until now, someone else may have barged it, or it may already be committing or have committed or somehow gotten aborted. In those cases, even if it might be okay to proceed, it’s too hard to tell, so let’s just retry (via the block-and-bail that will follow a failed barge attempt). If we do barge, we trigger the latch on the other transaction’s LTInfo on our way out. More on this in a minute (actually, 100 milliseconds).

The block-and-bail operation is short:

   member private this.blockAndBail(refinfo: LTInfo) =
        this.stop(LTState.Retry)
        try
            refinfo.Latch.Await(LockWaitMsecs) |> ignore
        with
        | :? ThreadInterruptedException -> ()
        raise retryEx

The stop call does some cleanup to get us ready for the a retry.:


    // Stop this transaction.
    member private this.stop(state: LTState) =
        match info with
        | None -> ()
        | Some sinfo ->
            lock sinfo (fun () ->
                sinfo.State <- state
                sinfo.Latch.CountDown())
            info <- None
            vals.Clear()
            sets.Clear()
            commutes.Clear()

Next we wait (at least for a little bit) for the other transaction’s latch to count down. Why wait? We just decided to block-and-bail. This only happens when we’ve run into a conflict and we’ve lost the battle. If we just barreled through and started our retry immediately, the other transaction might not have had time to finish and quit running (either completely or just its current iteration) – we’d just get locked again. Why not just wait? Why is the latch involved? Well, if the other transaction gets barged or if it transitions itself to a new state (this happens in stop), the latch will count down. That can release us to proceed more quickly than the full LockWaitMsecs.

Do not fear commitment

We’re almost done. Back in run, we left a hole to fill: // CODE TO COMMIT COMES HERE.

                    if newLTInfo.compareAndSet(LTState.Running,LTState.Committing) then

                        for pair in commutes do
                            let r = pair.Key
                            if sets.Contains(r) then
                                ()
                            else
                                let wasEnsured = ensures.Contains(r)
                                // can't upgrade read lock, so release
                                this.releaseIfEnsured(r)
                                this.tryWriteLock(r)
                                locked.Add(r)

                                if wasEnsured && r.currentPoint() > readPoint then
                                    raise retryEx

                                match r.TxInfo with
                                | Some refinfo when refinfo <> newLTInfo && refinfo.isRunning ->
                                    if not (this.barge(refinfo)) then
                                        raise retryEx
                                | _ -> ()

                                let v = r.currentVal()
                                vals.[r] <- v
                                for f in pair.Value do
                                    vals.[r] <- f.fn.applyTo(RTSeq.cons(vals.[r], f.args))

                        for r in sets do
                            this.tryWriteLock(r)
                            locked.Add(r)

                        // at this point, all values calced, all refs to be written locked
                        // no more client code to be called
                        let commitPoint = LockingTransaction.getCommitPoint()
                        for pair in vals do
                            let r = pair.Key
                            let oldval = r.currentVal()
                            let newval = pair.Value
                            r.setValue(newval, commitPoint)

                        finished <- true
                        newLTInfo.set(LTState.Committed) |> ignore

We try to set our state from Running to Committing; if this fails, we got barged and we don’t try to commit. For each commuted Ref, we check if it was ensured (unlock it if so), get a write lock, then apply all the commute computations in succession, ending up with a final in-transaction value for the Ref. Then get write locks on all the sets Refs.

At this point, all the values are calculated and stored in the vals map and we have write locks on the Refs. Set the values on all the Refs in the real world. Because we have a write lock, no one will see any of these changes utnil we are done. At the end, we change our state to Committed.

Last and definitely not least: // CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not/ This code is in a finally block. Whether we get killed or otherwise failed or if we committed, we have to clean up any locks we might be holding. Those are WriteLocks in the locked collection (only if we committed) and all the read locks on the ensured Refs (whether we committed or are retrying).

                for k = locked.Count - 1 downto 0 do
                    locked.[k].exitWriteLock()
                locked.Clear()
                for r in ensures do
                    r.exitReadLock()
                ensures.Clear()
                this.stop(if finished then LTState.Committed else LTState.Retry)

And that, I hope, suffices.

Onward?

If you have the stomach and the stamina, there is a little bon bon awaiting. Something simple and refreshing.

Part 3: STM in Clojure - Testing