guptashail on the WF forum posted this implementation of a oracle persistence service.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using System.Workflow.ComponentModel;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
using System.Data;
using System.Configuration;
using System.IO;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Collections.Specialized;
using System.Transactions;
namespace OracleWorkflowDemoService
{
class OraclePersistenceService : WorkflowPersistenceService, IPendingWork
{
private readonly object _lockTimer;
private readonly TimeSpan _loadInterval;
private Timer _loadIntervalTimer;
public OraclePersistenceService(NameValueCollection parameters)
: this(parameters["UnloadOnIdle"] != null ? bool.Parse(parameters["UnLoadOnIdle"]) : false,
parameters["InstanceOwnershipDuration"] != null ? int.Parse(parameters["InstanceOwnershipDuration"]) : 0,
parameters["LoadInterval"] != null ? int.Parse(parameters["LoadInterval"]) : 0)
{
}
protected OraclePersistenceService(Boolean unloadOnIdle, int instanceOwnershipDuration, int loadInterval)
{
_lockTimer = new object();
_loadInterval = new TimeSpan(0,0,loadInterval);
}
protected override void Start()
{
base.Start();
}
protected override void OnStarted()
{
try
{
base.OnStarted();
if (_loadInterval > TimeSpan.Zero)
{
lock (_lockTimer)
{
_loadIntervalTimer = new Timer(LoadExpiredWorkflows, null, _loadInterval, _loadInterval);
}
}
}
catch (Exception ex)
{
throw ex;
}
}
protected override void OnStopped()
{
base.OnStopped();
}
protected override void Stop()
{
base.Stop();
}
/// <summary>
///
/// </summary>
/// <param name="scopeId"></param>
/// <param name="outerActivity"></param>
/// <returns></returns>
protected override Activity LoadCompletedContextActivity(Guid scopeId, Activity outerActivity)
{
//get the workflow instance ID
Guid instanceId = WorkflowEnvironment.WorkflowInstanceId;
Activity activity = Deserialize(instanceId, scopeId, outerActivity);
if (activity == null)
{
RaiseException(instanceId,
"Unable to deserialize activity", null);
}
return activity;
}
/// <summary>
///
/// </summary>
/// <param name="instanceId"></param>
/// <returns></returns>
protected override Activity LoadWorkflowInstanceState(Guid instanceId)
{
Activity activity = Deserialize(instanceId, Guid.Empty, null);
if (activity == null)
{
RaiseException(instanceId, "Unable to deserialize workflow", null);
}
return activity;
}
/// <summary>
///
/// </summary>
/// <param name="activity"></param>
protected override void SaveCompletedContextActivity(Activity activity)
{
WorkItem workItem = new WorkItem();
workItem.activity = activity;
workItem.contextId = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);
workItem.instanceId = WorkflowEnvironment.WorkflowInstanceId;
workItem.State = WorkItem.workflowState.ActiveInstance;
WorkflowEnvironment.WorkBatch.Add(this, workItem);
}
/// <summary>
///
/// </summary>
/// <param name="rootActivity"></param>
/// <param name="unlock"></param>
protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
{
//get the workflow instance ID
Guid WFinstanceId = WorkflowEnvironment.WorkflowInstanceId;
//determine the status of the workflow
WorkflowStatus status = WorkflowPersistenceService.GetWorkflowStatus(rootActivity);
WorkItem workItem = new WorkItem();
workItem.activity = rootActivity;
workItem.instanceId = WFinstanceId;
workItem.contextId = Guid.Empty;
switch (status)
{
case WorkflowStatus.Completed:
case WorkflowStatus.Terminated:
workItem.State = WorkItem.workflowState.Completed;
break;
default:
workItem.State = WorkItem.workflowState.ActiveInstance;
break;
}
WorkflowEnvironment.WorkBatch.Add(this, workItem);
}
/// <summary>
///
/// </summary>
/// <param name="activity"></param>
/// <returns></returns>
protected override bool UnloadOnIdle(Activity activity)
{
//always unload on idle
return true;
}
/// <summary>
///
/// </summary>
/// <param name="rootActivity"></param>
protected override void UnlockWorkflowInstanceState(Activity rootActivity)
{
}
/// <summary>
///
/// </summary>
/// <param name="instanceId"></param>
/// <param name="contextId"></param>
/// <param name="activity"></param>
private void Serialize(Guid instanceId, Guid contextId, Activity activity)
{
OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
try
{
string commandtext = "";
string result = "";
OracleCommand comd = new OracleCommand();
comd.Connection = conn;
comd.BindByName = true;
TimerEventSubscriptionCollection timerEvent =
(TimerEventSubscriptionCollection)activity.GetValue(
TimerEventSubscriptionCollection.TimerCollectionProperty);
DateTime nexttimer = new DateTime();
if (timerEvent != null)
{
TimerEventSubscription timerEventSubscription = timerEvent.Peek();
if (timerEventSubscription != null)
nexttimer = timerEventSubscription.ExpiresAt;
}
OracleParameter param1 = comd.Parameters.Add("instanceid", OracleDbType.Char, 36);
param1.Direction = ParameterDirection.Input;
param1.Value = instanceId.ToString();
OracleParameter param2 = comd.Parameters.Add("state", OracleDbType.Blob);
param2.Direction = ParameterDirection.Input;
param2.Value = GetDefaultSerializedForm(activity);
OracleParameter param3 = comd.Parameters.Add("nexttimer", OracleDbType.TimeStamp);
param3.Direction = ParameterDirection.Input;
param3.Value = nexttimer.ToLocalTime();
commandtext = "Select uidInstanceId from InstanceState where uidInstanceId = '" + instanceId.ToString() + "'";
comd.CommandText = commandtext;
conn.Open();
result = (string)comd.ExecuteScalar();
if (result == null)
{
commandtext = " Insert into InstanceState (uidInstanceID,state,Modified,NextTimer) VALUES (:instanceid, :state,sysdate,:nexttimer) ";
}
else
{
commandtext = " Update InstanceState set state = :state, Modified = sysdate, NextTimer = :nexttimer where uidInstanceId = :instanceid ";
}
comd.CommandText = commandtext;
comd.ExecuteNonQuery();
if (!contextId.Equals(Guid.Empty))
{
commandtext = "Select contextId from ContextState where contextId = '" + contextId.ToString() + "'";
comd.CommandText = commandtext;
result = (string)comd.ExecuteScalar();
if (result == null)
{
commandtext = " Insert into ContextState (uidInstanceID,contextId,state,Modified,ActivityName) VALUES (:instanceid, :contextid, :state, sysdate,'" + activity.Name + "') ";
}
else
{
commandtext = " Update ContextState set state = :state, Modified = sysdate where uidInstanceId = :instanceid and contextId = :contextid ";
}
comd.Parameters.Remove(param3);
OracleParameter param4 = comd.Parameters.Add("contextid", OracleDbType.Char, 36);
param4.Direction = ParameterDirection.Input;
param4.Value = contextId.ToString();
comd.CommandText = commandtext;
comd.ExecuteNonQuery();
}
}
catch (Exception ex)
{
RaiseException(instanceId,
"Exception in serialization", ex);
}
finally
{
if (conn.State == ConnectionState.Open)
conn.Close();
}
}
/// <summary>
///
/// </summary>
/// <param name="instanceId"></param>
/// <param name="contextId"></param>
/// <param name="rootActivity"></param>
/// <returns></returns>
private Activity Deserialize(Guid instanceId, Guid contextId, Activity rootActivity)
{
Activity activity = null;
OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
try
{
OracleCommand comd = new OracleCommand();
string commandtext = "";
comd.Connection = conn;
comd.BindByName = true;
OracleParameter param1 = comd.Parameters.Add("state", OracleDbType.Blob);
param1.Direction = ParameterDirection.Output;
OracleParameter param2 = comd.Parameters.Add("instanceid", OracleDbType.Char, 36);
param2.Direction = ParameterDirection.Input;
param2.Value = instanceId.ToString();
if (!contextId.Equals(Guid.Empty))
{
commandtext = "Begin SELECT state into :state from ContextState WHERE uidInstanceId = :instanceid and contextId = :contextid; END;";
OracleParameter param3 = comd.Parameters.Add("contextid", OracleDbType.Char, 36);
param3.Direction = ParameterDirection.Input;
param3.Value = contextId.ToString();
}
else
{
commandtext = "Begin SELECT state into :state from InstanceState WHERE uidInstanceId = :instanceid; END;";
}
comd.CommandText = commandtext;
conn.Open();
comd.ExecuteNonQuery();
activity = RestoreFromDefaultSerializedForm((byte[])((OracleBlob)(comd.Parameters[0].Value)).Value, rootActivity);
}
catch (Exception ex)
{
RaiseException(instanceId,
"Exception in deserialization", ex);
}
finally
{
if (conn.State == ConnectionState.Open)
conn.Close();
}
return activity;
}
/// <summary>
///
/// </summary>
/// <param name="instanceId"></param>
private void DeleteWorkflow(Guid instanceId)
{
OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
try
{
OracleCommand comd = new OracleCommand(String.Concat("Delete InstanceState where uidInstanceId = '", instanceId.ToString(), "'"), conn);
conn.Open();
comd.ExecuteNonQuery();
comd.CommandText = String.Concat("Delete contextstate where uidInstanceId = '", instanceId.ToString(), "'");
comd.ExecuteNonQuery();
}
catch (Exception e)
{
RaiseException(instanceId,
"Exception in deletion", e);
}
finally
{
if (conn.State == ConnectionState.Open)
conn.Close();
}
}
private void RaiseException(Guid instanceId, String message, Exception ex)
{
if (ex == null)
{
throw new PersistenceException(
String.Format("Workflow: {0} Error: {1}",
instanceId, message));
}
else
{
throw new PersistenceException(
String.Format("Workflow: {0} Error: {1}: Inner: {2}",
instanceId, message, ex.Message), ex);
}
}
#region IPendingWork Members
public void Commit(Transaction transaction, System.Collections.ICollection items)
{
try
{
foreach (WorkItem item in items)
{
if (item.State == WorkItem.workflowState.ActiveInstance)
Serialize(item.instanceId, item.contextId, item.activity);
else
DeleteWorkflow(item.instanceId);
}
}
catch (Exception ex)
{
RaiseException(Guid.Empty, "Exception in commit", ex);
}
}
public void Complete(bool succeeded, System.Collections.ICollection items)
{
}
public bool MustCommit(System.Collections.ICollection items)
{
return true;
}
#endregion
/// <summary>
///
/// </summary>
/// <param name="state"></param>
private void LoadExpiredWorkflows(object state)
{
lock (_lockTimer)
{
if (State != WorkflowRuntimeServiceState.Started)
return;
OracleConnection conn = new OracleConnection(ConfigurationManager.ConnectionStrings["OraclePersistenceService"].ConnectionString);
try
{
OracleCommand comd = new OracleCommand("Select uidInstanceId from InstanceState where nexttimer < sysdate ", conn);
conn.Open();
OracleDataReader reader = comd.ExecuteReader();
while (reader.Read())
{
Runtime.GetWorkflow(new Guid(reader["uidInstanceId"].ToString())).Load();
}
}
catch (Exception ex)
{
RaiseException(Guid.Empty, "Exception in loading expired workflows", ex);
}
finally
{
if (conn.State == ConnectionState.Open)
conn.Close();
}
}
}
}
class WorkItem
{
public Activity activity;
public Guid instanceId;
public Guid contextId;
public workflowState State;
public enum workflowState
{
/// <summary>
/// Pending work item is for a workflow instance.
/// </summary>
ActiveInstance,
/// <summary>
/// Pending work item is for a completed scope.
/// </summary>
Completed
}
}
}
Tags: Oracle, Persistence, WF 3.5