How to Run a Pipeline Batch Programmatically in Sitecore (a Full Guide)

Artsem Prashkovich on September 21, 2019

During my work with Data Exchange Framework (DEF), I noticed that sometimes we need to run synchronization process programmatically. As we all know, pipeline batches are used in DEF to run pipelines processing. 

A pipeline batch is used to configure synchronization processes and to provide basic reporting information after the processes run.

So, the task of running synchronization programmatically means starting a pipeline batch from code. Seems simple. A few minutes of googling and I’ve got the first results:


Guid tenantId = ...         //tenant that owns the pipeline batch
Guid pipelineBatchId = ...  //pipeline batch
IEnumerable<PipelineBatch> batches = null;
batches = Sitecore.DataExchange.Context.TenantRepository.GetPipelineBatches(tenantId);
if (batches != null)
{
  PipelineBatch pipelineBatch = batches.FirstOrDefault(x => x.ID == pipelineBatchId);
  if (pipelineBatch != null)
  {
    var runner = new InProcessPipelineBatchRunner();
    if (runner.Run(pipelineBatch))
    {
      //pipeline batch was started
    }
  }
}

The code above seems to be pretty simple and solve the issue completely. At least, I was thinking so. But this code runs an import in the thread where the same code has been initiated. As we know, DEF sync can take a long time and it can cause a performance issue. To avoid this, I decided to go the same way as Sitecore runs and recommends to run an import, using Sitecore Jobs thereby not blocking the main thread. Also, sometimes we need to put some object into a synchronization context. Looking at the issues, I’ve decided to write my own “Pipeline Runner.”

The full version of the runner is presented on GitHub. Here, I am going to explain the main parts of implementation. An entry point of Pipeline Runner is BatchRunner class and RunStandardBatch method:


public void RunStandardBatch(ID batchItemId, IPlugin[] plugins)
        {
            var batchItem = Sitecore.Configuration.Factory.GetDatabase("master")
                .GetItem(batchItemId);

            if (PipelineBatchRunner == null || batchItem == null || !Helper.IsPipelineBatchItem(batchItem) || !Helper.IsItemEnabled(batchItem))
                return;

            var pipelineBatch = Helper.GetPipelineBatch(batchItem);

            Run(pipelineBatch, plugins);
        }

The method requires two parameters: batchItemId - an item ID of pipeline batch which you are going to run, and plugins - an array of objects that inherited Sitecore.DataExchange.IPlugin interface and can contain any type of data. The plugins will be put into Pipelines Context and can be further used in the synchronization process. There was a case when I had to import a single contact into Dynamics CRM. I identified that contact in a method of a website business logics. I’ve implemented a plugin like above, put it into context and then used it to read contact to sync:


public class SingleContactSyncPlugin : Sitecore.DataExchange.IPlugin 
{ 
    public List<Guid> ContactIdsToSync { get; set; } 
}

The RunStandardBatch method does some basic preparation for running an import, such as resolving batch model and checking if required settings and conditions exist. Then it runs the Run method:


 protected virtual void Run(PipelineBatch batch, IPlugin[] plugins)
        {
            if (batch == null)
                return;

            var pipelineBatchRunner = (InProcessPipelineBatchRunner)PipelineBatchRunner;

            if (pipelineBatchRunner != null && (!pipelineBatchRunner.IsRunningRemotely(batch) || !PipelineBatchRunner.IsRunning(batch.Identifier)))
            {
                const string category = "Data Exchange";

                var parameters = new object[]
                {
                    batch,
                    GetUser(),
                    plugins
                };
                var options = new JobOptions(batch.Name, category, "Data Exchange Framework", this, "RunPipelineBatch", parameters);
                PipelineBatchRunner.CurrentProcesses[batch.Identifier] = JobManager.Start(options);
            }

        }

This method prepares a Sitecore Job where the import will be run. Also, it checks whether the batch hasn’t been already run. If all the points are checked, it runs a job which, at the same time, calls RunPipelineBatch method:


public void RunPipelineBatch(PipelineBatch pipelineBatch, User currentUser, IPlugin[] plugins)
        {
            if (PipelineBatchRunner == null)
                return;

            if (currentUser == null)
                currentUser = Sitecore.Context.User;

            using (new UserSwitcher(currentUser))
            {
                var pipelineBatchContext = GetPipelineBatchContext();
                if (plugins != null)
                    plugins.ForEach(q => pipelineBatchContext.AddPlugin(q));

                PipelineBatchRunner.Run(pipelineBatch, pipelineBatchContext);
            }
        }

Currently the method above is similar to the code that Sitecore suggested for running pipeline programmatically, but with one difference: it loads custom plugins into context.

This article explains the idea of how to achieve the goal: run pipeline batch programmatically. Originally BatchRunner class had more methods and properties. Also, there is a Helper.cs class that makes basic things and there is no sense in putting it here. If you are interested in such functionality, I strongly recommend to get the code from the repository.

P.S. Are you sure that you need to have a pipeline batch in Sitecore as an item only for running synchronization programmatically? In the next article I am explaining how to run processing pipeline without a pipeline batch. Let’s say Run Synchronization Programmatically via a Virtual Pipeline Batch.