Dec 23, 2010

Optimistic Concurrency with C#

There's lots of ways to get Optimistic Concurrency with your database calls. I like row versions to guarantee exclusivity. You could use a database lock, but that's expensive and shouldn't be necessary where data contention is low. Don't use update times since the system time is rather course.

With this class, we expect that most of the time the optimistic lock will work. However, we will be relying on other data access clients to 'obey the rules'.

This data access class accesses only one table and relies on the version to change when a row modification is made. There's only one modification made here, but this simple class is easily expanded.

The lazy class ConcurrencyObject is just used to shuffle data around. It hides implementation from everyone other than the assembly it's created in. Useful if you separate the data access agents from the other parts of your code with libraries.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;

namespace OcDac
{
    public class OCDataAgent
    {
        private readonly SqlConnection myConnection;
        private readonly SqlCommand selectPendingObjectCommand;
        private readonly SqlCommand updatePendingObjectCommand;
        public OCDataAgent()
        {
            myConnection = new SqlConnection();
            // build the connection string
            SqlConnectionStringBuilder bu = new SqlConnectionStringBuilder();
            bu.DataSource = "tcp:10.0.0.114, 1370";
            bu.InitialCatalog = "ocSample";
            bu.IntegratedSecurity = false; // Sql Server Authentification
            bu.UserID = "sa";
            bu.Password = "youwish";
            myConnection.ConnectionString = bu.ConnectionString;

            // This stmt is used to get the ID and Version of one object that's pending
            selectPendingObjectCommand = new SqlCommand { Connection = myConnection };
            selectPendingObjectCommand.CommandText = "SELECT TOP 1 ID, VERSION FROM OnlyTable WHERE STATUS='PENDING'";

            // Claim this object if it's row number hasn't been modified and has the correct ID
            updatePendingObjectCommand = new SqlCommand { Connection = myConnection };
            updatePendingObjectCommand.CommandText = "UPDATE OnlyTable SET STATUS='IN_PROCESS', VERSION=@new_version WHERE ID=@id AND VERSION=@version";
            updatePendingObjectCommand.Parameters.Add("@id", System.Data.SqlDbType.BigInt);
            updatePendingObjectCommand.Parameters.Add("@version", System.Data.SqlDbType.BigInt);
            updatePendingObjectCommand.Parameters.Add("@new_version", System.Data.SqlDbType.BigInt);

        }

        public void LockPendingObject(ConcurrencyObject co)
        {
            if (myConnection.State == System.Data.ConnectionState.Closed)
                OpenConnection();

            updatePendingObjectCommand.Parameters[0].Value = co.Id;
            updatePendingObjectCommand.Parameters[1].Value = co.Version;            
            updatePendingObjectCommand.Parameters[2].Value = co.Id + 1;
            
            // If the claim succeeded, then we can update our co otherwise exception
            var rowsAffected = updatePendingObjectCommand.ExecuteNonQuery();
            if (rowsAffected != 0)
                co.Id++;
            else 
                throw new Exception("Could not lock object using OC");
        }

        public ConcurrencyObject GetPendingObject()
        {
            if (myConnection.State == System.Data.ConnectionState.Closed)
                OpenConnection();
            var co = new ConcurrencyObject();

            var reader = selectPendingObjectCommand.ExecuteReader();

            while (reader.Read())
            {
                co.Id = reader.GetInt64(0);
                co.Version = reader.GetInt64(1);
            }
            reader.Close();
            return co;
        }

        private void OpenConnection()
        {
            myConnection.Open();
        }
        
    }    

    public class ConcurrencyObject 
    {
        internal Int64 Version { get; set; }
        internal Int64 Id { get; set; }
    }
}