Sitecore Cortex and ML: Part 4 - How to Process Engine Workers, Options Dictionary, Agents and Task Manager

The Processing Engine is made up of the following key components:

  • Message handlers receive tasks registered via the Task Manager API.
  • Agents delegate tasks to workers.
  • Workers perform data processing tasks, such as data projection or model training.

In the previous post, we implemented logic that creates Projection from Interaction repository. Next, a “Merge” process should be executed, and then the “Train” process. Workers are responsible for that.

There are two types of workers: distributed workers and deferred workers.

Distributed workers

Distributed workers read data in batches from an external data source and perform processing on that data (such as projecting it into a format that is suitable for machine learning). The data set is split into cursors and processed by multiple agents in parallel, which means that several workers are working on the same task. Examples: Projection worker, Evaluation worker.

IMPORTANT:

  • Dataset is always required for a distributed worker (as an initial data).
  • If parallel reads are not supported in your solution (for example if you use a custom dataset that doesn`t support parallel reads) - you will need to use a deferred worker.

Deferred workers

Deferred workers are called once by a single task agent and do not accept a data source by default (but you can use data source if you need). Example: Merge worker, Train worker.

As per Sitecore documentation, all workers accept a options dictionary object that inherits DeferredWorkerOptionsDictionary or DistributedWorkerOptionsDictionary depending on the type of worker. You must pass in a worker options dictionary when registering a task.

Options dictionary contains:

  • information about worker and information that a worker requires
  • information about Model
  • information about storage and data schema

Task Manager

The task manager is responsible for registering a task or chain of tasks.

For our scenario we need to register our workers and run corresponding tasks in a chain: Projection => Merge => Train => Evaluate.

A more detailed chain of processes for our demo is the following:

Sitecore cortex training model

Small clarification about the Evaluate and Predict blocks since sometimes it is not clear what is the difference between them. When Evaluation worker is executed it calls Evaluate method of the cortex model. This method receives a portion of data (batch of contacts) and calculates some values by passing this data thought train model. Next the same portion of data (batch of contacts) and calculated results come to ‘ConsumeEvaluationResultsAsync’ method of Evaluation worker where you can do any updates/optimization for your business scenario (this is a ‘Predict’ block of our diagram). It is used just for SOLID principles: “Train” and “Evaluate” are related to ML and they are disconnected from xdb, they only work with data from processing engine blob storage.

There are two ways to access Task Manager:

  • Register Task Manager in the context of the Processing Engine: just inject “ITaskManager taskManager” in a constructor.
  • Register TaskManager in the context of the role like Content Management by using service locator “var taskManager = ServiceLocator.ServiceProvider.GetService<ITaskManager>();”

Of course, we will use option 1) for our process. We have already used the second option in “Part 1 - Creating Sitecore Processing Engine Service” to registration tasks with WebApi call.

Agents

Referring to Sitecore documentation agents are responsible for processing work within the Cortex Processing Engine. Each agent represents a thread and can be configured to run in parallel. The Processing Engine ships with several default agents, including a task agent, polls the Cortex Processing Engine Tasks database at regular intervals and calls workers to perform those tasks. (They are similar to usual Sitecore agents and schedulers).

Sitecore Cortex Processing Engine

There are 2 types of agents that you can inherit from:

  • BaseAgent (runs only once on startup)
  • ReccuringAgent (runs at regular configurable intervals)

For our demo, we will register ReccuringAgent and run our workers once a day.

Note: You can also register parallel agents. More information about it you can find here https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/agents.html

Let`s start building options dictionaries and workers for our train and evaluate logic.

Training Options dictionary

public class RfmTrainingWorkerOptionsDictionary : DeferredWorkerOptionsDictionary
    {
        public const string ModelTypeKey = "ModelType";
        public const string SchemaNameKey = "SchemaName";
        public const string TableNamesKey = "TableNames";
        
// Full namespace for our RfmTrainingWorker
private const string TrainingWorkerOpenGenericTypeFormat = "Demo.Foundation.ProcessingEngine.Models.RfmTrainingWorker, Demo.Foundation.ProcessingEngine";

        public RfmTrainingWorkerOptionsDictionary(
          string workerEntityTypeString,
          string modelTypeString,
          string schemaName,
          IReadOnlyList<string> tableNames,
          IReadOnlyDictionary<string, string> modelOptions)
          : this(CreateValidatedDictionaryWithWorkerType(workerEntityTypeString, modelTypeString, schemaName, tableNames, modelOptions))
        {
        }

        [JsonConstructor]
        protected RfmTrainingWorkerOptionsDictionary(IDictionary<string, string> dictionary)
          : base(dictionary)
        {
        }

        public string ModelType => this[nameof(ModelType)];

        public string SchemaName => this[nameof(SchemaName)];

        public IReadOnlyDictionary<string, string> ModelOptions => GetOptionsWithoutReservedKeys(this);

        public IReadOnlyList<string> TableNames => this.DeserializeJsonValue<IReadOnlyList<string>>(nameof(TableNames));

        public static RfmTrainingWorkerOptionsDictionary Parse(
          IReadOnlyDictionary<string, string> options)
        {
            Condition.Requires(options, nameof(options)).IsNotNull().IsNotEmpty();
            string emptyRequiredString1 = options.GetNonEmptyRequiredString("ModelType");
            IReadOnlyList<string> tableNames = JsonConvert.DeserializeObject<IReadOnlyList<string>>(options.GetNonEmptyRequiredString("TableNames"));
            string emptyRequiredString2 = options.GetNonEmptyRequiredString("WorkerType");
            IReadOnlyDictionary<string, string> withoutReservedKeys = GetOptionsWithoutReservedKeys(options);
            string emptyOptionalString = options.GetNonEmptyOptionalString("SchemaName", (string)null);
            IDictionary<string, string> validatedDictionary = CreateValidatedDictionary(emptyRequiredString1, emptyOptionalString, tableNames, withoutReservedKeys);
            validatedDictionary.Add("WorkerType", emptyRequiredString2);
            return new RfmTrainingWorkerOptionsDictionary(validatedDictionary);
        }

private static IDictionary<string, string> CreateValidatedDictionaryWithWorkerType(
          string workerEntityTypeString,
          string modelType,
          string schemaName,
          IReadOnlyList<string> tableNames,
          IReadOnlyDictionary<string, string> modelOptions)
        {
            Condition.Requires(workerEntityTypeString, nameof(workerEntityTypeString)).IsNotNullOrWhiteSpace();
            IDictionary<string, string> validatedDictionary = CreateValidatedDictionary(modelType, schemaName, tableNames, modelOptions);
            validatedDictionary.Add("WorkerType", string.Format(CultureInfo.InvariantCulture, TrainingWorkerOpenGenericTypeFormat, workerEntityTypeString));
            return validatedDictionary;
        }

        private static IReadOnlyDictionary<string, string> GetOptionsWithoutReservedKeys(
          IReadOnlyDictionary<string, string> options)
        {
            return options.Except("ModelType", "TableNames", "WorkerType", "SchemaName");
        }

        private static IDictionary<string, string> CreateValidatedDictionary(
          string modelType,
          string schemaName,
          IReadOnlyList<string> tableNames,
          IReadOnlyDictionary<string, string> modelOptions)
        {
            Condition.Requires(modelType, nameof(modelType)).IsNotNullOrWhiteSpace();
            Condition.Requires(schemaName, nameof(schemaName)).IsNotLongerThan<string>(50);
            Condition.Requires(tableNames, nameof(tableNames)).IsNotNull().IsNotEmpty();
            Condition.Requires(modelOptions, nameof(modelOptions)).IsNotNull();
            modelOptions.EnsureNotContainsKeys("ModelType", "TableNames", "WorkerType", "SchemaName");
            foreach (string tableName in tableNames)
                Condition.Requires(tableName, nameof(tableNames)).IsNotNullOrWhiteSpace().IsNotLongerThan(192, string.Format("{0} parameter should not contain table names longer than {1} characters.", (object)nameof(tableNames), (object)192));
            string str = modelType.Truncate(50);
            return new Dictionary<string, string>(modelOptions.ToDictionary(x => x.Key, x => x.Value))
            {
                [ModelTypeKey] = modelType,
                [SchemaNameKey] = (string.IsNullOrWhiteSpace(schemaName) ? str : schemaName),
                [TableNamesKey] = tableNames.SerializeToJson()
            };
        }
    }

Training worker

public class RfmTrainingWorker: IDeferredWorker    {
        private readonly IModel<Interaction> _model;
        private readonly RfmTrainingWorkerOptionsDictionary _options;
        private readonly ITableStore _tableStore;
        private readonly ILogger<RfmTrainingWorker> _logger;
        private readonly IServiceProvider _serviceProvider;
        public RfmTrainingWorker(
            ITableStoreFactory tableStoreFactory,
            IServiceProvider provider,
            ILogger<RfmTrainingWorker> logger,
            AllowedModelsDictionary modelsDictionary,
            RfmTrainingWorkerOptionsDictionary options,
            IServiceProvider serviceProvider)
        {
        
            this._tableStore = tableStoreFactory.Create(options.SchemaName);
            this._options = options;
            this._logger = logger;
            this._model = modelsDictionary.CreateModel<Interaction>(provider, options.ModelType, options.ModelOptions);
            this._serviceProvider = serviceProvider;
        }

        public RfmTrainingWorker(
            ITableStoreFactory tableStoreFactory,
            IServiceProvider provider,
            ILogger<RfmTrainingWorker> logger,
            AllowedModelsDictionary modelsDictionary,
            IReadOnlyDictionary<string, string> options,
            IServiceProvider serviceProvider)
            : this(tableStoreFactory, provider, logger, modelsDictionary, RfmTrainingWorkerOptionsDictionary.Parse(options), serviceProvider)
        {
        }

        public async Task RunAsync(CancellationToken token)
        {
            IReadOnlyList<string> tableNames = this._options.TableNames;
            List<Task<TableStatistics>> tableStatisticsTasks = new List<Task<TableStatistics>>(tableNames.Count);
            foreach (string tableName in tableNames)
                tableStatisticsTasks.Add(this._tableStore.GetTableStatisticsAsync(tableName, token));
            TableStatistics[] tableStatisticsArray = await Task.WhenAll(tableStatisticsTasks).ConfigureAwait(false);
            List<TableDefinition> tableDefinitionList = new List<TableDefinition>(tableStatisticsTasks.Count);
            for (int index = 0; index < tableStatisticsTasks.Count; ++index)
            {
                TableStatistics result = tableStatisticsTasks[index].Result;
                if (result == null)
                    this._logger.LogWarning(string.Format("Statistics data for {0} table could not be retrieved. It will not participate in model training.", (object)tableNames[index]));
                else
                    tableDefinitionList.Add(result.Definition);
            }
            ModelStatistics modelStatistics = await this._model.TrainAsync(this._options.SchemaName, token, tableDefinitionList.ToArray()).ConfigureAwait(false);

await UpdateRfmFacets(modelStatistics as RfmStatistics, token);
                   }

public async Task UpdateRfmFacets(RfmStatistics statistics, CancellationToken token)
{
 ...
}
              public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize((object)this);
        }

        protected virtual void Dispose(bool dispose)
        {
        }
    }

Note: if you need to call only Train method of your processing engine model you can inherit your worker from TrainingWorker<TEntity>, or even use Sitecore default workers like TrainingWorker<Interaction> or TrainingWorker<Contact> without any custom implementation. But in our case, we inherit our worker from IDeferredWorker, because we need additional logic inside of our worker. We need to call UpdateRfmFacets method that will initialize RFM values and store them in contact facet. We can`t do it in Evaluate worker because RFM values calculation is specific and we need access to the entire dataset, we can`t calculate RFM values by using a distributed worker.

Evaluation worker and options dictionary

For evaluation options dictionary we will use sitecore default EvaluationWorkerOptionsDictionary.

We will inherit Evaluation worker from default Sitecore EvaluationWorker<TEntity> worker, in our case it is EvaluationWorker<Contact>:

public class RfmEvaluationWorker : EvaluationWorker<Contact>
    {
        private readonly ILogger<RfmEvaluationWorker> _logger;
        private readonly IServiceProvider _serviceProvider;
        public RfmEvaluationWorker(IModelEvaluator evaluator, IReadOnlyDictionary<string, string> options, ILogger<RfmEvaluationWorker> logger, IServiceProvider serviceProvider) : base(evaluator, options)
        {
            _logger = logger;
            _serviceProvider = serviceProvider;
        }

public RfmEvaluationWorker(IModelEvaluator evaluator, EvaluationWorkerOptionsDictionary options, ILogger<RfmEvaluationWorker> logger, IServiceProvider serviceProvider) : base(evaluator, options)
        {
            _logger = logger;
            _serviceProvider = serviceProvider;
        }

        // Update Cluster for Contact
protected override async Task ConsumeEvaluationResultsAsync(IReadOnlyList<Contact> entities, IReadOnlyList<object> evaluationResults, CancellationToken token)
        {
		throw new NotImplementedException();
         }
    }

Method ConsumeEvaluationResultsAsync (“Predict” block on our diagram) will be executed at the end of our chain, once Evaluate method of MLNetService returns predicted results (these results are passed in evaluationResults parameter).

You must also register workers in processing engine configurations. Add new file “sc.Demo.Workers.xml” in “ProcessingEngine\App_Data\Config\Sitecore\Processing\” folder:

<Settings>
  <Sitecore>
    <Processing>
      <Services>
        <TaskServicesFactory>
          <Options>
			      <RfmInteractionTrainingWorker>
              <Type>Demo.Foundation.ProcessingEngine.Train.Workers.RfmTrainingWorker, Demo.Foundation.ProcessingEngine</Type>
            </RfmInteractionTrainingWorker>
            <RfmEvaluationWorker>
              <Type>Demo.Foundation.ProcessingEngine.Predict.Workers.RfmEvaluationWorker, Demo.Foundation.ProcessingEngine</Type>
            </RfmEvaluationWorker>
          </Options>
        </TaskServicesFactory>
      </Services>
    </Processing>
  </Sitecore>
</Settings>

If you don`t do it, you will catch error:

Failed to register task. Sitecore.Processing.Engine.Exceptions.TypeNotFoundInAllowedTypesException: Worker Type is not allowed.

To register and run our workers we implement DemoAgent:

public class DemoAgent: RecurringAgent
    {
        private readonly ILogger<IAgent> _logger;
        private readonly ITaskManager _taskManager;
   
public DemoAgent(IConfiguration options, ILogger<IAgent> logger, ITaskManager taskManager) : base(options, logger)
        {
            _logger = logger;
            _taskManager = taskManager;
        }

        protected override async Task RecurringExecuteAsync(CancellationToken token)
        {
        return _taskManager.RegisterRfmModelTaskChainAsync(TimeSpan.FromDays(1));
        }
    }

  public static class TaskManagerExtensionsCustom
    {

        public static async Task RegisterRfmModelTaskChainAsync(
          this ITaskManager taskManager,
          TimeSpan expiresAfter)
        {
            // Define workers parameters

            // datasource for PurchaseOutcomeModel projection
           var interactionDataSourceOptionsDictionary = new InteractionDataSourceOptionsDictionary(new InteractionExpandOptions(IpInfo.DefaultFacetKey), 5, 10);
            // datasource for ContactModel protection 
            var contactDataSourceOptionsDictionary = new ContactDataSourceOptionsDictionary(new ContactExpandOptions(PersonalInformation.DefaultFacetKey,
                    EmailAddressList.DefaultFacetKey,
                    ContactBehaviorProfile.DefaultFacetKey,
                    RfmContactFacet.DefaultFacetKey)
                , 5, 10);

            var modelTrainingOptions = new ModelTrainingTaskOptions(
                // assembly name of our processing engine model (PurchaseOutcomeModel:IModel<Interaction>) 
                typeof(PurchaseOutcomeModel).AssemblyQualifiedName,
                // assembly name of entity for our processing engine model  (PurchaseOutcomeModel:IModel<Interaction>) 
                typeof(Interaction).AssemblyQualifiedName,
                // custom options that we pass to PurchaseOutcomeModel
                new Dictionary<string, string> { ["TestCaseId"] = "Id" },
                // projection tableName of PurchaseOutcomeModel, must be equal to first parameter of 'CreateTabular' method => PurchaseOutcomeModel.cs: CreateTabular("PurchaseOutcome", ...)
                "PurchaseOutcome", 
                // name of resulted table (any name)
                "DemoResultTable");

            var projectionDictionary = new ProjectionWorkerOptionsDictionary(
                modelTrainingOptions.ModelEntityTypeString,
                modelTrainingOptions.ModelTypeString, expiresAfter, modelTrainingOptions.SchemaName,
                modelTrainingOptions.ModelOptions);

            var modelTrainingOptions2 =
                new ModelTrainingTaskOptions(typeof(ContactModel).AssemblyQualifiedName, typeof(Contact).AssemblyQualifiedName, new Dictionary<string, string> { ["TestCaseId"] = "Id" }, "ContactModel", "DemoContactResultTable");

            var evaluationDictionary = new EvaluationWorkerOptionsDictionary(
                 typeof(RfmEvaluationWorker).AssemblyQualifiedName,//"Demo.Foundation.ProcessingEngine.Predict.Workers.RfmEvaluationWorker, Demo.Foundation.ProcessingEngine",
                typeof(ContactModel).AssemblyQualifiedName,
                new Dictionary<string, string> { ["TestCaseId"] = "Id" },
                "Evaluator.Schema",
                expiresAfter);

            // Register chain of Tasks

            // 1) Register Projection-worker
            Guid projectionTaskId = await taskManager.RegisterDistributedTaskAsync(
                interactionDataSourceOptionsDictionary,
                projectionDictionary,
                // no prerequisite tasks
                Enumerable.Empty<Guid>(),
                expiresAfter).ConfigureAwait(false);

            // 2) Register Merge-worker
            var mergeTaskIds = new List<Task<Guid>>();
            foreach (var targetTableNames in modelTrainingOptions.SourceTargetTableNamesMap)
            {
                var mergeWorkerOptionsDictionary = new MergeWorkerOptionsDictionary(targetTableNames.Value, targetTableNames.Key, expiresAfter, modelTrainingOptions.SchemaName);
                mergeTaskIds.Add(
                    taskManager.RegisterDeferredTaskAsync(
                        mergeWorkerOptionsDictionary,
                        // execute after Projection task
                        new []{ projectionTaskId },
                        expiresAfter));
            }
            await Task.WhenAll(mergeTaskIds).ConfigureAwait(false);

            // 3) Register Train-worker
            var trainWorkerOptionsDictionary = new RfmTrainingWorkerOptionsDictionary(
                modelTrainingOptions.ModelEntityTypeString,
                modelTrainingOptions.ModelTypeString,
                modelTrainingOptions.SchemaName,
                modelTrainingOptions.SourceTargetTableNamesMap.Values.ToList(),
                modelTrainingOptions.ModelOptions);

            Guid trainTaskId = await taskManager.RegisterDeferredTaskAsync(
                trainWorkerOptionsDictionary, 
                // execute after Merge task
                mergeTaskIds.Select(t => t.Result),
                expiresAfter).ConfigureAwait(false);

            // 4) Register Evaluate worker
            Guid evaluateTaskId = await taskManager.RegisterDistributedTaskAsync(
                    contactDataSourceOptionsDictionary,
                    evaluationDictionary,
                    // execute after Train worker
                    new[] { trainTaskId },
                    expiresAfter)
                .ConfigureAwait(false);
        }
    }

Notice we can also pass custom parameters in options dictionary ("TestCaseId" in the example above) that can be easily retrieved in our models (PurchaseOutcomeModel and ContactModel): var id = options["TestCaseId"];. For example, it can be useful if you need to do additional filtering of the initial data for your projection model (filter by DateTime etc.).

Note: if you don`t need Evaluation worker for your project you can register chain of Projection+Merge+Train workers much easier with Sitecore extension method of Sitecore.Processing.Tasks.Options.dll (in 1 line of code): taskManager.RegisterModelTrainingTaskChainAsync()

To register our agent in the Processing Engine we need to add new “sc.Processing.Engine.DemoAgents.xml” config in “xconnect_instance\App_Data\jobs\continuous\ProcessingEngine\App_Data\Config\Sitecore\Processing\”

<Settings>
  <Sitecore>
    <Processing>
      <Services>
        <DemoAgent>
          <Type>Demo.Foundation.ProcessingEngine.Models.DemoAgent, Demo.Foundation.ProcessingEngine</Type>
          <As>Sitecore.Processing.Engine.Abstractions.IAgent, Sitecore.Processing.Engine.Abstractions</As>
          <LifeTime>Transient</LifeTime>
          <Options>
            <!-- The period which the agent sleeps before running again. -->
            <SleepPeriod>1.00:00:00.000</SleepPeriod>
          </Options>
        </DemoAgent>
      </Services>
    </Processing>
  </Sitecore>
</Settings>

Now we are ready to start processing tasks. When task agent is run you can trace processes in processing engine log file. It will look like this:

[Information] Registered Distributed Processing Task, TaskId: 903384bc-8f3c-4af7-8a92-1367baad1680, Worker: Sitecore.Processing.Engine.ML.Workers.ProjectionWorker`1[[Sitecore.XConnect.Interaction, Sitecore.XConnect, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null]], Sitecore.Processing.Engine.ML, DataSource: Sitecore.Processing.Engine.DataSources.DataExtraction.InteractionDataSource, Sitecore.Processing.Engine
[Information] Registered Deferred Processing Task, Id: 2d6f2a3d-91e3-46a7-9dc5-e8808f8d0643, Worker: Sitecore.Processing.Engine.ML.Workers.MergeWorker, Sitecore.Processing.Engine.ML
[Information] Registered Deferred Processing Task, Id: 4175af42-e800-4d3b-b1bd-14da4c219f33, Worker: Demo.Foundation.ProcessingEngine.Train.Workers.RfmTrainingWorker, Demo.Foundation.ProcessingEngine
[Information] Registered Distributed Processing Task, TaskId: 12d5f056-81af-42d3-8471-324b1f95d216, Worker: Demo.Foundation.ProcessingEngine.Predict.Workers.RfmEvaluationWorker, Demo.Foundation.ProcessingEngine, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, DataSource: Sitecore.Processing.Engine.DataSources.DataExtraction.ContactDataSource, Sitecore.Processing.Engine
[Information] TaskAgent Executing worker. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 903384bc-8f3c-4af7-8a92-1367baad1680, TaskType: DistributedProcessing.
[Information] TaskAgent Worker execution completed. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 903384bc-8f3c-4af7-8a92-1367baad1680, TaskType: DistributedProcessing.
[Information] TaskAgent Executing worker. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 2d6f2a3d-91e3-46a7-9dc5-e8808f8d0643, TaskType: DeferredAction.
[Information] TaskAgent Worker execution completed. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 2d6f2a3d-91e3-46a7-9dc5-e8808f8d0643, TaskType: DeferredAction.
[Information] TaskAgent Executing worker. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 4175af42-e800-4d3b-b1bd-14da4c219f33, TaskType: DeferredAction.
[Information] TaskAgent Worker execution completed. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 4175af42-e800-4d3b-b1bd-14da4c219f33, TaskType: DeferredAction.
[Information] TaskAgent Executing worker. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 12d5f056-81af-42d3-8471-324b1f95d216, TaskType: DistributedProcessing.
[Information] TaskAgent Worker execution completed. Machine: BRIMIT-SBA-PC, Process: 10728, AgentId: 9, TaskId: 912d5f056-81af-42d3-8471-324b1f95d216, TaskType: DistributedProcessing.

And you can also see corresponding tasks in “Processing Engine Tasks” database.

Processing Engine Tasks

Next you can trace what happens in “Processing Engine Storage” database.

  1. When projection worker starts, you will see a lot of records with key prefixes Data, DataMap/PurchaseOutcome, Stats/PurchaseOutcome. (“PurchaseOutcome” in prefixes is our parameter “modelProjectionTableName” that we passed into worker dictionary)

Processing Engine Tasks

  1. Next when merge worker starts, projection worker data will be deleted, and we will see data of Merge worker. There will also be a lot of records with prefix Data, and two records with prefixes DataMap/DemoResultTable and Stats/DemoResultTable (“DemoResultTable” in prefixes is our parameter “projectionResultsTableName” that we passed into worker dictionary)

records with prefix Data

  1. Train worker does not store anything in blob storage in our scenario. Best practise is to clear projected data, because it is no longer needed. If you don`t do it - this data will be cleared after expiration period that we defined when registered workers. But if your expiration period is bigger than the sleep period of agent, duplicates of projected data might be processed, and you can receive wrong results. (In our scenario, clusters can be calculated incorrect).
  2. When evaluate worker starts, you will see a lot of records with key prefixes Data, DataMap/ContactModel, Stats/ContactModel. (“ContactModel” in prefixes is our parameter “modelProjectionTableName” that we passed into worker dictionary). And also you can see that Namespace is “Evaluator.Schema” - it is shema name that we passed in EvaluationWorkerOptionsDictionary.

ContactModel

Here are some links that will help you understand this topic better:

Official sitecore documentation:

https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/workers.html
https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/task-manager.html
https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/agents.html

Live demo by Alistair Deneys https://www.youtube.com/watch?v=7UEqCHrHY5Y

Table of contents Dive into Sitecore Cortex and Machine Learning - Introduction

Read next Part 5 - Implementation of Machine Learning engine


Do you need help with your Sitecore project?
VIEW SITECORE SERVICES