How to Run Synchronization Programmatically via a Virtual Pipeline Batch

In the previous article I’ve explained how to run Data Exchange Framework (DEF) synchronization process programmatically using Pipeline Batch item, which comes with Sitecore. Since that time, I’ve noticed that I can’t use Pipeline Batch item programmatically all the time. I pass some contextual data into the pipeline when I run it from code, and if that data is missing, a successful import is not possible. So, if I run the import manually using the batch item, it will lead to an issue caused by this contextual data thing. This case misleads users who work with DEF via Sitecore interface.

I decided to move unnecessary pipelines out of Sitecore. But, I still wanted to have logs for each import to be able to control the process.

A solution is the Virtual Pipeline Batch. I spent some time for debugging and studying it and, finally, I’ve found how to implement it.

Each Pipeline Batch uses a PipelineBatchProcessor in the background to control the import and run pipelines. I started from this point. I’ve implemented VirtualPipelineBatchProcessor:


using System;
using Sitecore.DataExchange.Contexts;
using Sitecore.DataExchange.Models;
using Sitecore.DataExchange.Processors;
using Sitecore.DataExchange.Processors.PipelineBatches;
using Sitecore.Services.Core.Diagnostics;

namespace PipelineBatchRunner
{
    public class VirtualPipelineBatchProcessor : BasePipelineBatchProcessor
    {
        protected ILogger _logger { get; set; }

        protected override void OnStartingProcessing(PipelineBatch pipelineBatch, PipelineBatchContext pipelineBatchContext, ILogger logger)
        {
            _logger = logger;
            base.OnStartingProcessing(pipelineBatch, pipelineBatchContext, logger);
        }

        protected override void OnStarting(PipelineBatchProcessorEventArgs e)
        {
            if (_logger != null)
                this.Log(new Action<string>(_logger.Error), e.PipelineBatch, e.PipelineBatchContext, "Pipeline processing Requested At:" + DateTime.UtcNow, Array.Empty<string>());
        }

        protected override void OnFinished(PipelineBatchProcessorEventArgs e)
        {
            if (_logger != null)
                this.Log(new Action<string>(_logger.Error), e.PipelineBatch, e.PipelineBatchContext, "Pipeline processing Last Run Finished At:" + DateTime.UtcNow, Array.Empty<string>());
        }

        protected override void OnAborted(PipelineBatchProcessorEventArgs e)
        {
            if (_logger != null)
                this.Log(new Action<string>(_logger.Error), e.PipelineBatch, e.PipelineBatchContext, "Pipeline processing Last Run Finished At:" + DateTime.UtcNow, Array.Empty<string>());
        }
    }
}

The VirtualPipelineBatchProcessor overrides the base methods to write a log on some general events. The next step is the VirtualPipelineBatchBuilder:


using System;
using System.Collections.Generic;
using System.Linq;
using Sitecore.Common;
using Sitecore.Data;
using Sitecore.Data.Items;
using Sitecore.DataExchange.Extensions;
using Sitecore.DataExchange.Local.Extensions;
using Sitecore.DataExchange.Loggers;
using Sitecore.DataExchange.Models;
using Sitecore.DataExchange.Plugins;
using Sitecore.DataExchange.Repositories;
using Sitecore.DataExchange.VerificationLog;
using Sitecore.Services.Core.Model;
using System.Security.Cryptography;
using System.Text;

namespace PipelineBatchRunner
{
    public class VirtualPipelineBatchBuilder
    {
        public const string TenantTemplateId = "{327A381B-59F8-4E88-B331-BEBC7BD87E4E}";

        public static PipelineBatch GetVirtualPipelineBatch(List<ID> pipelinesToRun, BatchSettings settings)
        {
            var db = Sitecore.Configuration.Factory.GetDatabase("master");
            return GetVirtualPipelineBatch(pipelinesToRun.Select(w => db.GetItem(w)).Where(q => q != null).Select(q => q.GetItemModel()).ToList(), settings);
        }
        public static PipelineBatch GetVirtualPipelineBatch(List<Item> pipelinesToRun, BatchSettings settings)
        {
            return GetVirtualPipelineBatch(pipelinesToRun.Where(q => q != null).Select(q => q.GetItemModel()).ToList(), settings);
        }

        public static PipelineBatch GetVirtualPipelineBatch(List<ItemModel> pipelinesToRun, BatchSettings settings)
        {
            if (pipelinesToRun == null)
                return null;

            var db = Sitecore.Configuration.Factory.GetDatabase("master");

            var virtualBatch = new PipelineBatch();
            virtualBatch.Enabled = true;

            var hash = GetHash(pipelinesToRun.Select(q => q.GetItemId().ToID().ToShortID().ToString())
                .Aggregate((f, s) => f + "|" + s));

            virtualBatch.Identifier = hash;
            virtualBatch.PipelineBatchProcessor = new VirtualPipelineBatchProcessor();
            virtualBatch.Tenant = GetTenant(db.GetItem(pipelinesToRun.First().GetItemId().ToID()));

            settings.ApplySettings(virtualBatch);
            
            foreach (var pipeline in pipelinesToRun)
            {
                if(pipeline == null) 
                    continue;

                var pipelineModel = GetPipeline(pipeline);

                virtualBatch.Pipelines.Add(pipelineModel);
            }

            if (!virtualBatch.Pipelines.Any())
                return null;

            virtualBatch.Name = "VirtualBatch (" + virtualBatch.Pipelines.Select(q => q.Name.Replace(" ", ".")).Aggregate((q,w) => q + "|" + w) + ")";
            

            return virtualBatch;

        }

        public static Tenant GetTenant(Item item)
        {

            if (item != null)
            {
                var tenantTemplateId = new ID(TenantTemplateId);
                var tenantItem = item.Axes.GetAncestors().Reverse().FirstOrDefault(x => x.TemplateID == tenantTemplateId);

                var tenantModel = tenantItem?.GetItemModel();

                var converter = tenantModel?.GetConverter<Tenant>(Sitecore.DataExchange.Context.ItemModelRepository);
                if (converter == null) return null;

                var convertResult = converter.Convert(tenantModel);

                return convertResult.WasConverted ? convertResult.ConvertedValue : null;
            }

            return null;
        }

        protected static Pipeline GetPipeline(ItemModel itemModel)
        {
            var converter = itemModel?.GetConverter<Pipeline>(Sitecore.DataExchange.Context.ItemModelRepository);

            if (converter == null) return null;

            var convertResult = converter.Convert(itemModel);

            return convertResult.WasConverted ? convertResult.ConvertedValue : null;
        }

        public static string GetHash(string inputString)
        {
            HashAlgorithm algorithm = SHA256.Create();
            var bytes = algorithm.ComputeHash(Encoding.UTF8.GetBytes(inputString));
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < bytes.Length; i++)
            {
                sb.Append(bytes[i].ToString("X2"));
            }
            return sb.ToString();
        }
    }
}

The main entry point:

GetVirtualPipelineBatch(List pipelinesToRun, BatchSettings settings), where pipelinesToRun property contains Pipeline Item IDs which need to be run. It represents the same logic as we select pipelines in the “Pipelines To Run” field of the original Pipeline Batch. BatchSettings allows to configure the pipelines execution process. BatchSettings.cs definition is below.


using System;
using System.Collections.Generic;
using Sitecore.DataExchange.Loggers;
using Sitecore.DataExchange.Models;
using Sitecore.DataExchange.Plugins;
using Sitecore.DataExchange.VerificationLog;

namespace PipelineBatchRunner
{
    public class BatchSettings
    {
        public BatchSettings()
        {
            TelemetryEnabled = false;
            IsIncludeStackTraceForExceptions = true;
            SupportedModes = new List<string>();
            LogLevels = new List<LogLevel>
            {
                LogLevel.Debug,
                LogLevel.Error,
                LogLevel.Fatal,
                LogLevel.Info,
                LogLevel.Warn
            };
            VerificationLogSettings = new VerificationLogSettings
            {
                SaveJson = false,
                VerificationEnabled = false,
                VerificationLog = null
            };
        }

        public bool TelemetryEnabled { get; set; }
        public bool IsIncludeStackTraceForExceptions { get; set; }
        public VerificationLogSettings VerificationLogSettings { get; set; }
        public List<string> SupportedModes { get; set; }

        public List<LogLevel> LogLevels { get; set; }

        public void ApplySettings(PipelineBatch pipelineBatch)
        {
            var telemetryPlugin = new TelemetryActivitySettings
            {
                Enabled = TelemetryEnabled
            };
            pipelineBatch.AddPlugin(telemetryPlugin);

            var supportedModesPlugin2 = new MultiModeSupportSettings
            {
                SupportedModes = new List<string>()
            };
            pipelineBatch.AddPlugin(supportedModesPlugin2);

            var pipelineBatchSummary = new PipelineBatchSummary()
            {
                IncludeStackTraceForExceptions = IsIncludeStackTraceForExceptions
            };

            foreach (var logLevel in LogLevels)
            {
                pipelineBatchSummary.LogLevels.Add(logLevel);
            }
            pipelineBatch.AddPlugin(pipelineBatchSummary);

            SitecoreItemSettings newPlugin = new SitecoreItemSettings()
            {
                ItemId = Guid.Parse(pipelineBatch.Identifier)
            };
            pipelineBatch.AddPlugin(newPlugin);
            pipelineBatch.AddPlugin(VerificationLogSettings);
        }
    }
}

The code below shows how to use the BatchRunner and how to pass custom data to a pipeline. I have omitted the part where I fill a list of pipeline IDs because it can be done in different ways: it can be populated from some config files, from search or in Sitecore. It depends on your business logic and your preferences.


public class PipelineBatchUsage
    {
        public void RunPipelineBatch()
        {
            var runner = new BatchRunner();
            var pipelinesToRun = new List<ID>();
            //Populate the pipelinesToRun list here.
            //Some code

            var plugin = new CustomDataPlugin()
            {
                CurrentContactId = Sitecore.Analytics.Tracking.Current.Contact.ContactId
            };

            runner.RunVirtualBatch(pipelinesToRun, new IPlugin[]{plugin});
        }
    }

    public class CustomDataPlugin : IPlugin
    {
         public Guid CurrentContactId { get; set; }
    }