STM in Clojure - Code
We explain the code for Ref
and LockingTransaction
.
For background, refer to the previous post, STM in Clojure - Design.
The Ref
class
I’m going to leave out some details of the implementation that are not relevant to STM directly – validators, watchers, and agents.
There are two primary classes to deliver: Ref
and LockingTransaction
. In F#, these will be mutally recursive classes. Ref
is by far the simpler of the two. It holds the history list of values. It forwards certain operations to the transaction currently running (on its thread), and provides some basic operations to support LockingTransaction
.
The history list is implemented using a doubly-linked circular list.
The list is never empty; it always starts with an entry for the initial value given when the Ref
is created.
The initial value will be associated with timestamp 0.
The ‘root’ of the list, the node that the Ref
holds diretly, will always have the most recent value.
All accesses to this list must be protected by locking. The requirements mentioned in the previous post match up with the semantics of a System.Threading.ReaderWriterLockSlim
.
It is incumbent on the LockingTransaction
to acquire the lock in read mode when it is reading the value of a Ref
and in write mode when it is updating the value of a Ref
. We will provide utility methods to make this easy.
Let’s get started. We define the class RefVal
to be a node in the double-linked circular list for holding values.
It must hold a value, a timestamp, and points to the next and previous nodes in the list.
The default constructor will create a node representing a list of one element, i.e., the next and previous pointers will point to the node itself.
In the first code I wrote for this, I had the primary constructor create a node that represented a list of one element. A singleton list is represented by have the pointers to the next and previous nodes point to the node itself. This can be done using a ‘recursive object reference’, as shown here:
type RefVal(v: obj, pt: int64) as this
// these initializations are sufficient to create a circular list of one element
let mutable value : obj = v
let mutable point : int64 = pt
let mutable prior : RefVal = this
let mutable next : RefVal = this
This will do the job. However, the code generated for the class has runtime initialization checks all over the place. As in: every method. There are several ways around this. One involves tagging the class with AllowNullLiteral
and set those fields to null
. The other approach is to go unsafe and initialize with Unchecked.defaultof<_>
. In either case, we need to make the constructor private and create public factory methods to yield properly initialized objects. I went with the second approach; thus
[<Sealed>]
type internal RefVal private (v: obj, pt: int64) =
let mutable value : obj = v
let mutable point : int64 = pt
// these implement a doubly-linked circular list
// the default constructor creates a self-linked node
let mutable prior : RefVal = Unchecked.defaultof<RefVal>
let mutable next : RefVal = Unchecked.defaultof<RefVal>
// create a list of one element
static member createSingleton(v, pt) =
let r = RefVal(v, pt)
r.Prior <- r
r.Next <- r
r
// Create a new RefVal and insert it after the given RefVal.
member this.insertAfter(v, pt) =
let r = RefVal(v, pt)
r.Prior <- this
r.Next <- this.Next
this.Next <- r
r.Next.Prior <- r
r
insertAfter
does some standard surgery to insert a new node into the list after the current node. It returns the new node. The rest of the code for RefVal
is straightforward.
// Ref will need some accessors to do iterations
member _.Value with get() = value
member _.Point with get() = point
member _.Prior with get() = prior
member _.Next with get() = next
// We will need to update the value in the root node
member _.SetValue(v: obj, pt: int64) =
value <- v
point <- pt
// There is one special operation to reset the root node so that the list has only this one element.
member this.Trim() =
prior <- this
next <- this
Now we can proceed with the Ref
class.
[<AllowNullLiteral>]
type Ref(initVal: obj)
// the root node of the history list
// holds the most recent value.
// Initialized to the initial value supplied for the Ref and timestamp 0
let mutable rvals = RefVal(initVal, 0L)
// We need a lock to protect access
let lock = new System.Threading.ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion)
// We need a field to hold the 'stamp' a transaction puts on the Ref.
// The details do not matter to us here. There may not be a stamp, so we use an option type.
let mutable txInfo : LTInfo option = None
// We need this history size bounds.
[<VolatileField>]
let mutable minHistory = 0
[<VolatileField>]
let mutable maxHistory = 10
// This little goodie we'll need later.
// It counts the number of read faults that have occurred, used to decide when we need to grow the history list.
let faults = AtomicInteger()
We’re going to need some basic accessors for LockingTransaction
and clojure.core to use.
member _.TxInfo
with get () = txinfo
and set(v) = txinfo <- v
member _.MinHistory with get() = minHistory
member _.MaxHistory with get() = maxHistory
member this.SetMinHistory(v) = minHistory <- v; this
member this.SetMaxHistory(v) = maxHistory <- v; this
Some little helpers for dealing with locking:
member _.enterReadLock() = lock.EnterReadLock()
member _.exitReadLock() = lock.ExitReadLock()
member _.enterWriteLock() = lock.EnterWriteLock()
member _.exitWriteLock() = lock.ExitWriteLock()
member _.tryEnterWriteLock(msecTimeout : int) = lock.TryEnterWriteLock(msecTimeout)
We get to see a little locking action here. getHistoryCount
is needed to implement get-history-count
in Clojure itself.
We need to acquire at least a read lock anytime we access rvals
.
// Count the entries in the values list
// the caller should lock
member private _.histCount() =
let mutable count = 0
let mutable rv = rvals.Next
while LanguagePrimitives.PhysicalEquality rv rvals do
count <- count + 1
rv <- rv.Next
count
member this.getHistoryCount() =
try
this.enterWriteLock()
this.histCount()
finally
this.exitWriteLock()
// Get rid of the history, keeping just the current value
member _.trimHistory() =
try
this.enterWriteLock()
rvals.Trim()
finally
this.exitWriteLock()
A few more convenience methods.
// These need to be called with a lock acquired.
member _.currentPoint() = rvals.Point
member _.currentVal() = rvals.Value
// Add to the fault count.
member _.addFault() = faults.incrementAndGet() |> ignore
We have a few more public methods to implement to complete the interface. Ref
needs to implement interface IDeref
.
Here is where we get our first interaction with the transaction.
As mentioned in the ‘design’ post, what deref
looks at depends on whether it is called in a transaction scope or not.
LockingTransaction.getRunning()
will return None
if we are not in a transaction.
Otherwise, it will return the transaction object.
If we are running in a transaction scope, we have to ask the transaction to supply the value because there might be an in-transaction value for this Ref
. If not, we just access the current value.
interface IDeref with
member this.deref() =
match LockingTransaction.getRunning() with
| None ->
try
this.enterReadLock()
rvals.Value
finally
this.exitReadLock()
| Some t -> t.doGet(this)
The Clojure functions ref-set
, alter
, and commute
are all implemented in LockingTransaction
.
LockingTransaction.getEx()
will return the transaction object if we are in a transaction scope and throw an exception if we are not.
// Set the value (must be in a transaction).
member this.set(v: obj) = LockingTransaction.getEx().doSet(this, v)
// Apply a commute to the reference. (Must be in a transaction.)
member this.commute(fn: IFn, args: ISeq) = LockingTransaction.getEx().doCommute(this, fn, args)
// Change to a computed value.
member this.alter(fn: IFn, args: ISeq) =
let t = LockingTransaction.getEx()
t.doSet(this, fn.applyTo(RTSeq.cons(t.doGet(this), args)))
// Touch the reference. (Add to the tracking list in the current transaction.)
member this.touch() = LockingTransaction.getEx().doEnsure(this)
Our final operation in Ref
is used by LockingTransaction
to update the value of the Ref
in the Ref world.
Here is where history comes into play. We will expand the history list in two situations:
(a) the size of the list is less than the minimum history size;
and (b) we had a fault in getting the value of this Ref
and we are less than the maximum history size.
The fault count is incremented in the LockingTransaction
code; we will reset the fault count to zero when we increase the list.
These two conditions are tested for in the if
clause below.
If either condition is satisfied, we add a new RefVal
node to the list.
If neither condition is met we replace the oldest value in the list with the new value and make it the new root node.
(The statement rvals <- rvals.Next
is doing the heavy lifting here; it rotate the oldet entry to be the root and changes its value. Draw pictures.)
// Set the value
member this.setValue(v: obj, commitPoint: int64) =
let hcount = this.histCount()
if (faults.get() > 0 && hcount < maxHistory) || hcount < minHistory then
rvals <- rvals.insertAfter(v, commitPoint)
faults.set(0) |> ignore
else
rvals <- rvals.Next
rvals.SetValue(v, commitPoint)
The LockingTransaction
class
The LockingTransaction
(LT) class is more complex. It is the class that manages the transactional state and the transactional operations. An LT has a state:
type LTState =
| Running = 1L
| Committing = 2L
| Retry = 3L
| Killed = 4L
| Committed = 5L
| Stopped = 6L
The state is held in an LTInfo
object:
type LTInfo(initState: LTState, startPoint: int64) =
// the state of the transaction
let mutable status : int64 = initState |> int64
// A latch for ... to be explained later.
let latch = CountdownLatch(1)
// Accessors
member _.Latch = latch
member _.StartPoint = startPoint
member _.State
with get() = Microsoft.FSharp.Core.LanguagePrimitives.EnumOfValue<int64, LTState>(status)
and set(v:LTState) = status <- int64 v
member this.compareAndSet(oldVal: LTState, newVal: LTState) =
let origVal = Interlocked.CompareExchange(&status, newVal |> int64, oldVal |> int64)
origVal = (oldVal |> int64)
member this.set(newVal: LTState) =
Interlocked.Exchange(&status, newVal |> int64)
member this.isRunning =
let s = Microsoft.FSharp.Core.LanguagePrimitives.EnumOfValue<int64, LTState>(Interlocked.Read(&status))
s = LTState.Running || s = LTState.Committing
The state is an LTState
value, but stored as an int64
.
We are careful to make changes to the status
field using Interlocked
methods bacause other threads may mutate this field.
Unfortunately, the Interlocked
methods won’t work on LTState
values, even though they are represented as int64
values.
I decided to stick with using the LTState
enumeration rather than integer constants,
trading a little more complexity here for clarity in the rest of the program.
The EnumOfValue
calls take care of translating from an int64
back to an LTState
.
Now we can define the LockingTransaction
class.
[<Sealed>]
type LockingTransaction private () =
...
Let’s get some bookkeeping out of the way.
We will need to generate timestamps.
A new timestamp is needed each time an LT attempts to execute a transaction, including each retry.
We also generate a new timestamp for each commit point.
The timestamp is a 64-bit integer, we use an AtomicLong
to generate them as multiple threads will be accessing the static field holding the most recent value.
The transaction will hold a read point, the timestamp assigned as it begins execution (including for each retry).
We keep track of its first-assigned read point, the start point, as a marker of its age relative to other transactions.
// The current point.
// Used to provide a total ordering on transactions for the purpose of determining preference on transactions when there are conflicts.
// Transactions consume a point for init, for each retry, and on commit if writing.
static member private lastPoint : AtomicLong = AtomicLong()
// The point at the start of the current retry (or first try).
let mutable readPoint : int64 = 0L
// The point at the start of the transaction.
let mutable startPoint : int64 = 0L
// Get/set a new read point value.
member this.getReadPoint() =
readPoint <- LockingTransaction.lastPoint.incrementAndGet()
// Get a commit point value.
static member getCommitPoint() =
LockingTransaction.lastPoint.incrementAndGet()
For age computations, we also keep track of the System.Ticks
value at the start of the transaction.
// The system ticks at the start of the transaction.
let mutable startTime : int64 = 0L
We keep track of the transaction’s state in an LTInfo
object.
This will be changed on every retry, hence the mutable
.
// The state of the transaction.
// This is an option so we can zero it out when the transaction is done.
let mutable info : LTInfo option = None
member _.Info = info
We need to detect if a transaction is running on the current thread and access it.
We use thread-local storage for this. Note that the type is LockingTransaction option
.
// The transaction running on the current thread. (Thread-local.)
[<DefaultValue;ThreadStatic>]
static val mutable private currentTransaction : LockingTransaction option
We provide several accessors. getRunning
returns a LockingTransaction option
.
getEx
returns a LockingTransaction
or throws an exception if there is no transaction running.
And we have a simple boolean check of existence.
// Get the transaction running on this thread (throw exception if no transaction).
static member getEx() =
let transOpt = LockingTransaction.currentTransaction
match transOpt with
| None ->
raise <| InvalidOperationException("No transaction running")
| Some t ->
match t.Info with
| None ->
raise <| InvalidOperationException("No transaction running")
| Some info -> t
// Get the transaction running on this thread (or None if no transaction).
static member getRunning() =
let transOpt = LockingTransaction.currentTransaction
match transOpt with
| None -> None
| Some t ->
match t.Info with
| None -> None
| Some info -> transOpt
// Is there a transaction running on this thread?
static member isRunning() = LockingTransaction.getRunning().IsSome
Note the special case of seeing an LT with no LTInfo
.
This is a transaction that has been stopped and is working on cleanup.
We haven’t gotten rid of it yet but we are in the process of doing so; it is not running.
The books that need keeping
In the previous post, we mentioned the collections that LockingTransaction
needs to maintain. Here they are:
// Ref assignments made in this transaction (both sets and commutes).
let vals = Dictionary<Ref,obj>()
// Refs that have been set in this transaction.
let sets = HashSet<Ref>()
// Ref commutes that have been made in this transaction.
let commutes = SortedDictionary<Ref, ResizeArray<CFn>>()
// The set of Refs holding read locks.
let ensures = HashSet<Ref>()
And not having a better place to put it, we need an exception to signal that we need to retry:
// Cached retry exception.
let retryEx = RetryEx("")
where we have already defined
exception RetryEx of string
Running a transaction
The Clojure-level macros dosync
and sync
both expand to call LockingTransaction.runInTransaction
.
This method creates a new LockingTransaction
object, sets it as the current transaction, runs the supplied code, and then tries to commit. However, if a transaction is already running on the current thread, we will join the existing transaction;
this happens if the body of an outer dosync
calls dosync
again, either directly or indirectly.
static member runInTransaction(fn:IFn) : obj =
let transOpt = LockingTransaction.currentTransaction
match transOpt with
| None ->
// no transaction running. Start one, put it on the thread, and run the code.
let newTrans = LockingTransaction()
LockingTransaction.currentTransaction <- Some newTrans
try
newTrans.run(fn)
finally
LockingTransaction.currentTransaction <- None
| Some t ->
match t.Info with
// I'm not sure when this case would happen. We have a transaction object that is effectively uninitialized.
// We use 'run' to take care of intializing it.
| None -> t.run(fn)
// In this case, the transaction is properly running. We can just invoke the body.
| _ -> fn.invoke()
Thus runInTransaction
takes care of seeing if we are already in a transaction and creating / registering one if not. The real work is in run
:
member this.run(fn:IFn) : obj =
let mutable finished = false
let mutable ret = null
let locked = ResizeArray<Ref>()
let mutable i = 0
while not finished && i < RetryLimit do
try
try
this.getReadPoint()
if i = 0 then
startPoint <- readPoint
startTime <- int64 Environment.TickCount
let newLTInfo = LTInfo(LTState.Running, startPoint)
info <- Some <| newLTInfo
ret <- fn.invoke()
// CODE TO COMMIT COMES HERE
with
| :? RetryEx -> ()
| ex when (LockingTransaction.containsNestedRetryEx(ex)) -> ()
| _ -> reraise()
finally
// CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not
i <- i + 1
if not finished then
raise <| InvalidOperationException("Transaction failed after reaching retry limit")
ret
The main body of run
is a loop that will keep (re)trying calling the body (fn.invoke()
) until the commit code succeeds (that will set the finished
flag to true
) or we iterate the maximum number of times.
If during this activity, either the invocation or the commit, a RetryEx
is thrown, we catch it and continue the loop.
If we exit the loop and finished
is still false
, we throw an exception: we have failed to commit the transaction within the allotted number of attempts.
BTW, that’s a lot of failures:
// The number of times to retry a transaction in case of a conflict.
[<Literal>]
let RetryLimit = 10000
As noted earlier, we do create a new LTInfo
object for each attempt.
Before looking at the commit code that was elided above, let’s look at what the function invocation might be doing.
Running the body
We earlier showed methods in Ref
that forward action to the LT that’s running on this thread:
member this.set(v: obj) = LockingTransaction.getEx().doSet(this, v)
member this.commute(fn: IFn, args: ISeq) = LockingTransaction.getEx().doCommute(this, fn, args)
member this.alter(fn: IFn, args: ISeq) =
let t = LockingTransaction.getEx()
t.doSet(this, fn.applyTo(RTSeq.cons(t.doGet(this), args)))
member this.touch() = LockingTransaction.getEx().doEnsure(this)
Also, Ref
’s implemention of IDeref.deref
calls doGet
. That’s the easiest one, so let’s start with it.
member this.doGet(r: Ref) : obj =
if info.Value.isRunning then
if vals.ContainsKey(r) then
vals.[r]
else
let valOpt =
try
r.enterReadLock()
let rec loop (ver:RefVal) : obj option =
if ver.Point <= readPoint then
Some ver.Value
elif Object.ReferenceEquals(ver.Prior, r.getRVals()) then
None
else
loop ver.Prior
loop (r.getRVals())
finally
r.exitReadLock()
match valOpt with
|None ->
// no version of val precedes the read point
r.addFault()
raise retryEx
|Some v -> v
else
raise retryEx
First, we check to see if this transaction is still running. It was running when getEx
was called, but some other thread might have sneaked in and barged us in between then and now. Checking here corresponds the our earlier promise to always check our status.
If our transaction is no longer running, we throw a RetryEx
.
If we have done a ref-set
or alter
in this transaction, we will have an in-transaction value stored in vals
, so we look for that. Otherwise, we need to go to the Ref
r
and see what value it has. We do not necessarily need the latest value, just any value with a timestamp (ver.Point
) less than the timestamp we are working under (readPoint
). The function loop
is a simple iteration through the doubly-linked list. It returns an obj option
so we can signal ‘no version in the history precedes the read point’. In that case, we mark on the Ref that a fault has occurred and throw for a retry. If we do find a value, we return it.
Okay. That’s the easy one. doCommute
is not much harder.
member this.doCommute(r:Ref, fn:IFn, args:ISeq) : obj =
if not info.Value.isRunning then
raise retryEx
if not (vals.ContainsKey(r)) then
let v =
try
r.enterReadLock()
r.currentVal()
finally
r.exitReadLock()
vals[r] <- v
let mutable fns : ResizeArray<CFn> = null
if not (commutes.TryGetValue(r, &fns)) then
fns <- ResizeArray<CFn>()
commutes[r] <- fns
fns.Add({ fn = fn; args = args })
let v = fn.applyTo(RTSeq.cons(vals[r], args))
vals[r] <- v
v
Check to make sure our transaction is still running. Check to see if we have an in-transaction value set. If not, grab the current value of the Ref and store it as the in-transaction value (for just a moment).
Create an entry for this Ref in commutes
with an empty list as the associated value if no entry exists.
And then run commutes action using the present value of the Ref and store that result as the in-transaction value.
The only locking required is a brief hold of a read lock on the Ref to grab the current value.
doEnsure
is the first place we look at checking for the ‘stamp’ on a Ref:
// Touch a ref. (Lock it.)
member this.doEnsure(r: Ref) =
if info.Value.isRunning then
if ensures.Contains(r) then
()
else
r.enterReadLock()
// someone completed a write after our snapshot
if r.currentPoint() > readPoint then
r.exitReadLock()
raise retryEx
match r.TxInfo with
| Some refinfo when refinfo.isRunning ->
r.exitReadLock()
if not <| Object.ReferenceEquals(refinfo,info) then
this.blockAndBail(refinfo)
| _ -> ensures.Add(r) |> ignore
else
raise retryEx
Check to make sure our transaction is still running. If the Ref is already ensured, this is a no-op.
Otherwise, we get a read lock to check the current value of the Ref. First, if someone has done a real commit after we started, we are going to retry – and exit the read lock before throwing. Second, we look at r.TxInfo
to see if someone has put a stamp on it.
If so, we release the read lock. Now the tricky part I mentioned in the previous post. If someone else has the stamp, we are screwed, so we block-and-bail. If not, then the stamp is our stamp – we just get out – this is a no-op. If there no stamp or the stamper is not running, then we add the Ref to our ensures
collection. Phew.
And now the monster:
// Set the value of a ref inside the transaction.
member this.doSet(r: Ref, v: obj) =
if info.Value.isRunning then
if commutes.ContainsKey(r) then
raise <| InvalidOperationException("Can't set after commute")
if not (sets.Contains(r)) then
sets.Add(r) |> ignore
this.lock(r) |> ignore
vals.[r] <- v
v
else
raise retryEx
Check to make sure our transaction is still running. If the Ref has had a commute call on it already, this is an illegal operation.
If the Ref is not already in our sets
collection, then we add it to the collection and call lock
.
And record the in-transaction value for the Ref in the vals
map.
That wasn’t so bad.
The monster is hiding in the lock
method.
member private this.lock(r : Ref) =
// can't upgrade read lock, so release it.
this.releaseIfEnsured(r)
let mutable unlocked = true
try
this.tryWriteLock(r)
unlocked <- false
if r.currentPoint() > readPoint then
raise retryEx
let success() =
r.TxInfo <- info
r.currentVal()
match r.TxInfo with
| Some (refinfo:LTInfo) when refinfo.isRunning && not <| obj.ReferenceEquals(refinfo, info) ->
if this.barge(refinfo) then
success()
else
r.exitWriteLock()
unlocked <- true
this.blockAndBail(refinfo)
| _ -> success()
finally
if not unlocked then
r.exitWriteLock()
The first is to call releaseIfEnsured
. This checks if the Ref is in the ensures
collection. If it is, we remove it from the collection and release its read lock. This last piece is critical. Membership in ensures
if-and-only-if has a read lock.
member private this.releaseIfEnsured(r: Ref) =
if ensures.Contains(r) then
ensures.Remove(r) |> ignore
r.exitReadLock()
The next thing is try to get a write lock. If that times out, an exception is thrown. The unlocked
flag controls whether we need to release the write lock on our way of the method.
Now we see some checks that are starting maybe are starting to look familiar. If the Ref’s value in the real world has been set subsequent to the start of our attempt, retry. Next, check the stamp on the Ref. If there is a stamp and the stamper is running and it is not us, we have some work to do. (Next paragraph.) Otherwise, put our stamp on it and return the current value. (Why return the current value? It just gets ignore. I don’t know.)
If there is running stamper that is not us, well, one of us has got to go. We try to barge the other guy. If we do, then all is good – declare succees. If we cannot barge the other guy, than we have to kill our attempt and retry – block-and-bail.
One little subtlety here. Not that if don’t barge, we release the write lock (and set the indicator flag) before call block-and-bail, even though the finally
clause would release the write lock for us otherwise. This gives other threads a chance to get a lock (read or write) before we finish the block-and-bail. We’re just being polite.
Barging is relatively straightforward.
member private this.barge(refinfo: LTInfo) =
let mutable barged = false
// if this transaction is older, try to abort the other
if this.bargeTimeElapsed && startPoint < refinfo.StartPoint then
barged <- refinfo.compareAndSet(LTState.Running, LTState.Killed)
if barged then
refinfo.Latch.CountDown()
barged
First, we don’t try to barge unless we have been around for a while. A little patience is called for.
// How old our transaction must be before we 'barge' it.
// Java version has BARGE_WAIT_NANOS, set at 10*1_000_000.
// If I'm thinking correctly tonight, that's 10 milliseconds.
// Ticks here are 100 nanos, so we should have 10 * 1_000_000/100 = 100_000.
[<Literal>]
let BargeWaitTicks = 100_000L
// Determine if sufficient clock time has elapsed to barge another transaction.
member private _.bargeTimeElapsed = (int64 Environment.TickCount) - startTime > BargeWaitTicks
Second, we only barge the other transaction if we are older. This compares start points instead of read points, so this dates back to the first attempts of each transaction, not the current retry of each. (To determine who is older, look at dates of birth, not who had the most recent birthday.) The compareAndSet
uses an Interlocked
method. Why might this fail? The other transaction was running just before we got here. But from then now until now, someone else may have barged it, or it may already be committing or have committed or somehow gotten aborted. In those cases, even if it might be okay to proceed, it’s too hard to tell, so let’s just retry (via the block-and-bail that will follow a failed barge attempt). If we do barge, we trigger the latch on the other transaction’s LTInfo
on our way out. More on this in a minute (actually, 100 milliseconds).
The block-and-bail operation is short:
member private this.blockAndBail(refinfo: LTInfo) =
this.stop(LTState.Retry)
try
refinfo.Latch.Await(LockWaitMsecs) |> ignore
with
| :? ThreadInterruptedException -> ()
raise retryEx
The stop
call does some cleanup to get us ready for the a retry.:
// Stop this transaction.
member private this.stop(state: LTState) =
match info with
| None -> ()
| Some sinfo ->
lock sinfo (fun () ->
sinfo.State <- state
sinfo.Latch.CountDown())
info <- None
vals.Clear()
sets.Clear()
commutes.Clear()
Next we wait (at least for a little bit) for the other transaction’s latch to count down.
Why wait? We just decided to block-and-bail. This only happens when we’ve run into a conflict and we’ve lost the battle.
If we just barreled through and started our retry immediately, the other transaction might not have had time to finish and quit running (either completely or just its current iteration) – we’d just get locked again. Why not just wait? Why is the latch involved?
Well, if the other transaction gets barged or if it transitions itself to a new state (this happens in stop
), the latch will count down. That can release us to proceed more quickly than the full LockWaitMsecs
.
Do not fear commitment
We’re almost done. Back in run
, we left a hole to fill: // CODE TO COMMIT COMES HERE
.
if newLTInfo.compareAndSet(LTState.Running,LTState.Committing) then
for pair in commutes do
let r = pair.Key
if sets.Contains(r) then
()
else
let wasEnsured = ensures.Contains(r)
// can't upgrade read lock, so release
this.releaseIfEnsured(r)
this.tryWriteLock(r)
locked.Add(r)
if wasEnsured && r.currentPoint() > readPoint then
raise retryEx
match r.TxInfo with
| Some refinfo when refinfo <> newLTInfo && refinfo.isRunning ->
if not (this.barge(refinfo)) then
raise retryEx
| _ -> ()
let v = r.currentVal()
vals.[r] <- v
for f in pair.Value do
vals.[r] <- f.fn.applyTo(RTSeq.cons(vals.[r], f.args))
for r in sets do
this.tryWriteLock(r)
locked.Add(r)
// at this point, all values calced, all refs to be written locked
// no more client code to be called
let commitPoint = LockingTransaction.getCommitPoint()
for pair in vals do
let r = pair.Key
let oldval = r.currentVal()
let newval = pair.Value
r.setValue(newval, commitPoint)
finished <- true
newLTInfo.set(LTState.Committed) |> ignore
We try to set our state from Running
to Committing
; if this fails, we got barged and we don’t try to commit.
For each commuted Ref, we check if it was ensured (unlock it if so), get a write lock, then apply all the commute computations in succession, ending up with a final in-transaction value for the Ref. Then get write locks on all the sets
Refs.
At this point, all the values are calculated and stored in the vals
map and we have write locks on the Refs. Set the values on all the Refs in the real world. Because we have a write lock, no one will see any of these changes utnil we are done. At the end, we change our state to Committed
.
Last and definitely not least: // CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not
/ This code is in a finally block. Whether we get killed or otherwise failed or if we committed, we have to clean up any locks we might be holding. Those are WriteLocks in the locked
collection (only if we committed) and all the read locks on the ensured Refs (whether we committed or are retrying).
for k = locked.Count - 1 downto 0 do
locked.[k].exitWriteLock()
locked.Clear()
for r in ensures do
r.exitReadLock()
ensures.Clear()
this.stop(if finished then LTState.Committed else LTState.Retry)
And that, I hope, suffices.
Onward?
If you have the stomach and the stamina, there is a little bon bon awaiting. Something simple and refreshing.