Oracle Persistence Service for Windows Workflow Foundation 3.5

guptashail on the WF forum posted this implementation of a oracle persistence service.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using System.Workflow.ComponentModel;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
using System.Data;
using System.Configuration;
using System.IO;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Collections.Specialized;
using System.Transactions;
namespace OracleWorkflowDemoService
{
    class OraclePersistenceService : WorkflowPersistenceService, IPendingWork
    {
        private readonly object _lockTimer;
        private readonly TimeSpan _loadInterval;
        private Timer _loadIntervalTimer;

        public OraclePersistenceService(NameValueCollection parameters)
            : this(parameters["UnloadOnIdle"] != null ? bool.Parse(parameters["UnLoadOnIdle"]) : false,
                parameters["InstanceOwnershipDuration"] != null ? int.Parse(parameters["InstanceOwnershipDuration"]) : 0,
                parameters["LoadInterval"] != null ? int.Parse(parameters["LoadInterval"]) : 0)
        {
        }

        protected OraclePersistenceService(Boolean unloadOnIdle, int instanceOwnershipDuration, int loadInterval)
        {
            _lockTimer = new object();
            _loadInterval = new TimeSpan(0,0,loadInterval);
        }       

        protected override void Start()
        {
            base.Start();
        }

        protected override void OnStarted()
        {
            try
            {
                base.OnStarted();

                if (_loadInterval > TimeSpan.Zero)
                {
                    lock (_lockTimer)
                    {
                        _loadIntervalTimer = new Timer(LoadExpiredWorkflows, null, _loadInterval, _loadInterval);
                    }
                }

            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        protected override void OnStopped()
        {
            base.OnStopped();
        }

        protected override void Stop()
        {
            base.Stop();
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="scopeId"></param>
        /// <param name="outerActivity"></param>
        /// <returns></returns>
        protected override Activity LoadCompletedContextActivity(Guid scopeId, Activity outerActivity)
        {
            //get the workflow instance ID
            Guid instanceId = WorkflowEnvironment.WorkflowInstanceId;
            Activity activity = Deserialize(instanceId, scopeId, outerActivity);
            if (activity == null)
            {
                RaiseException(instanceId,
                "Unable to deserialize activity", null);
            }
            return activity;
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="instanceId"></param>
        /// <returns></returns>
        protected override Activity LoadWorkflowInstanceState(Guid instanceId)
        {
            Activity activity = Deserialize(instanceId, Guid.Empty, null);
            if (activity == null)
            {
                RaiseException(instanceId, "Unable to deserialize workflow", null);
            }
            return activity;
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="activity"></param>
        protected override void SaveCompletedContextActivity(Activity activity)
        {
            WorkItem workItem = new WorkItem();

            workItem.activity = activity;
            workItem.contextId = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);
            workItem.instanceId = WorkflowEnvironment.WorkflowInstanceId;
            workItem.State = WorkItem.workflowState.ActiveInstance;

            WorkflowEnvironment.WorkBatch.Add(this, workItem);
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="rootActivity"></param>
        /// <param name="unlock"></param>
        protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
        {
            //get the workflow instance ID
            Guid WFinstanceId = WorkflowEnvironment.WorkflowInstanceId;

            //determine the status of the workflow
            WorkflowStatus status = WorkflowPersistenceService.GetWorkflowStatus(rootActivity);
            WorkItem workItem = new WorkItem();
            workItem.activity = rootActivity;
            workItem.instanceId = WFinstanceId;
            workItem.contextId = Guid.Empty;

            switch (status)
            {
                case WorkflowStatus.Completed:
                case WorkflowStatus.Terminated:
                    workItem.State = WorkItem.workflowState.Completed;
                    break;
                default:
                    workItem.State = WorkItem.workflowState.ActiveInstance;
                    break;
            }

            WorkflowEnvironment.WorkBatch.Add(this, workItem);
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="activity"></param>
        /// <returns></returns>
        protected override bool UnloadOnIdle(Activity activity)
        {
            //always unload on idle
            return true;
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="rootActivity"></param>
        protected override void UnlockWorkflowInstanceState(Activity rootActivity)
        {

        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="instanceId"></param>
        /// <param name="contextId"></param>
        /// <param name="activity"></param>
        private void Serialize(Guid instanceId, Guid contextId, Activity activity)
        {
            OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
            try
            {
                string commandtext = "";
                string result = "";

                OracleCommand comd = new OracleCommand();
                comd.Connection = conn;
                comd.BindByName = true;

                TimerEventSubscriptionCollection timerEvent =
                    (TimerEventSubscriptionCollection)activity.GetValue(
                    TimerEventSubscriptionCollection.TimerCollectionProperty);

                DateTime nexttimer = new DateTime();

                if (timerEvent != null)
                {
                    TimerEventSubscription timerEventSubscription = timerEvent.Peek();
                    if (timerEventSubscription != null)
                        nexttimer = timerEventSubscription.ExpiresAt;
                }
                             
               
                OracleParameter param1 = comd.Parameters.Add("instanceid", OracleDbType.Char, 36);
                param1.Direction = ParameterDirection.Input;
                param1.Value = instanceId.ToString();

                OracleParameter param2 = comd.Parameters.Add("state", OracleDbType.Blob);
                param2.Direction = ParameterDirection.Input;
                param2.Value = GetDefaultSerializedForm(activity);

                OracleParameter param3 = comd.Parameters.Add("nexttimer", OracleDbType.TimeStamp);
                param3.Direction = ParameterDirection.Input;
                param3.Value = nexttimer.ToLocalTime();

                commandtext = "Select uidInstanceId from InstanceState where uidInstanceId = '" + instanceId.ToString() + "'";
                comd.CommandText = commandtext;

                conn.Open();

                result = (string)comd.ExecuteScalar();

                if (result == null)
                {
                    commandtext = " Insert into InstanceState (uidInstanceID,state,Modified,NextTimer) VALUES (:instanceid, :state,sysdate,:nexttimer) ";
                }
                else
                {
                    commandtext = " Update InstanceState set state = :state, Modified = sysdate, NextTimer = :nexttimer where uidInstanceId = :instanceid  ";
                }

                comd.CommandText = commandtext;               
                comd.ExecuteNonQuery();
             

                if (!contextId.Equals(Guid.Empty))
                {
                    commandtext = "Select contextId from ContextState where contextId = '" + contextId.ToString() + "'";
                    comd.CommandText = commandtext;
                    result = (string)comd.ExecuteScalar();

                    if (result == null)
                    {
                        commandtext = " Insert into ContextState (uidInstanceID,contextId,state,Modified,ActivityName) VALUES (:instanceid, :contextid, :state, sysdate,'" + activity.Name + "') ";
                    }
                    else
                    {
                        commandtext = " Update ContextState set state = :state, Modified = sysdate where uidInstanceId = :instanceid and contextId = :contextid ";
                    }

                    comd.Parameters.Remove(param3);
                    OracleParameter param4 = comd.Parameters.Add("contextid", OracleDbType.Char, 36);
                    param4.Direction = ParameterDirection.Input;
                    param4.Value = contextId.ToString();

                    comd.CommandText = commandtext;
                    comd.ExecuteNonQuery();

                }
                   

            }
            catch (Exception ex)
            {
                RaiseException(instanceId,
                "Exception in serialization", ex);
            }
            finally
            {
                if (conn.State == ConnectionState.Open)
                    conn.Close();

            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="instanceId"></param>
        /// <param name="contextId"></param>
        /// <param name="rootActivity"></param>
        /// <returns></returns>
        private Activity Deserialize(Guid instanceId, Guid contextId, Activity rootActivity)
        {
            Activity activity = null;
            OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
            try
            {

                OracleCommand comd = new OracleCommand();
                string commandtext = "";

                comd.Connection = conn;
                comd.BindByName = true;

                OracleParameter param1 = comd.Parameters.Add("state", OracleDbType.Blob);
                param1.Direction = ParameterDirection.Output;

                OracleParameter param2 = comd.Parameters.Add("instanceid", OracleDbType.Char, 36);
                param2.Direction = ParameterDirection.Input;
                param2.Value = instanceId.ToString();

                if (!contextId.Equals(Guid.Empty))
                {
                    commandtext = "Begin SELECT state into :state from ContextState WHERE uidInstanceId = :instanceid and contextId = :contextid; END;";
                    OracleParameter param3 = comd.Parameters.Add("contextid", OracleDbType.Char, 36);
                    param3.Direction = ParameterDirection.Input;
                    param3.Value = contextId.ToString();
                }
                else
                {
                    commandtext = "Begin SELECT state into :state from InstanceState WHERE uidInstanceId = :instanceid; END;";
                }
                              
                comd.CommandText = commandtext;            
               
                conn.Open();
                comd.ExecuteNonQuery();

                activity = RestoreFromDefaultSerializedForm((byte[])((OracleBlob)(comd.Parameters[0].Value)).Value, rootActivity);               
               
            }
            catch (Exception ex)
            {
                RaiseException(instanceId,
                "Exception in deserialization", ex);
            }
            finally
            {
                if (conn.State == ConnectionState.Open)
                    conn.Close();

            }
            return activity;
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="instanceId"></param>
        private void DeleteWorkflow(Guid instanceId)
        {

            OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
            try
            {
                OracleCommand comd = new OracleCommand(String.Concat("Delete InstanceState where uidInstanceId = '", instanceId.ToString(), "'"), conn);

                conn.Open();

                comd.ExecuteNonQuery();

                comd.CommandText = String.Concat("Delete contextstate where uidInstanceId = '", instanceId.ToString(), "'");
                comd.ExecuteNonQuery();

            }
            catch (Exception e)
            {
                RaiseException(instanceId,
                "Exception in deletion", e);
            }
            finally
            {
                if (conn.State == ConnectionState.Open)
                    conn.Close();

            }
        }

        private void RaiseException(Guid instanceId, String message, Exception ex)
        {
            if (ex == null)
            {
                throw new PersistenceException(
                String.Format("Workflow: {0} Error: {1}",
                instanceId, message));
            }
            else
            {
                throw new PersistenceException(
                String.Format("Workflow: {0} Error: {1}: Inner: {2}",
                instanceId, message, ex.Message), ex);
            }
        }

        #region IPendingWork Members

        public void Commit(Transaction transaction, System.Collections.ICollection items)
        {
            try
            {
                foreach (WorkItem item in items)
                {
                    if (item.State == WorkItem.workflowState.ActiveInstance)
                        Serialize(item.instanceId, item.contextId, item.activity);
                    else
                        DeleteWorkflow(item.instanceId);

                }
            }
            catch (Exception ex)
            {
                RaiseException(Guid.Empty, "Exception in commit", ex);
            }
        }

        public void Complete(bool succeeded, System.Collections.ICollection items)
        {

        }

        public bool MustCommit(System.Collections.ICollection items)
        {
            return true;
        }

        #endregion
      

        /// <summary>
        ///
        /// </summary>
        /// <param name="state"></param>
        private void LoadExpiredWorkflows(object state)
        {
            lock (_lockTimer)
            {
                if (State != WorkflowRuntimeServiceState.Started)
                    return;
               
                OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
                try
                {
                    OracleCommand comd = new OracleCommand("Select uidInstanceId from InstanceState where nexttimer < sysdate ", conn);

                    conn.Open();

                    OracleDataReader reader = comd.ExecuteReader();

                    while (reader.Read())
                    {
                        Runtime.GetWorkflow(new Guid(reader["uidInstanceId"].ToString())).Load();
                    }

                }
                catch (Exception ex)
                {
                    RaiseException(Guid.Empty, "Exception in loading expired workflows", ex);
                }
                finally
                {
                    if (conn.State == ConnectionState.Open)
                        conn.Close();
                }               
            }
        }
    }
     

    class WorkItem
    {
        public Activity activity;
        public Guid instanceId;
        public Guid contextId;
        public workflowState State;

        public enum workflowState
        {
            /// <summary>
            /// Pending work item is for a workflow instance.
            /// </summary>
            ActiveInstance,
            /// <summary>
            /// Pending work item is for a completed scope.
            /// </summary>
            Completed
        }
   
    }
}

Tags: , ,

Leave a Reply