Sitecore Cortex and ML: Part 3 - How to Process Engine Projection Models and Datasets

In this blog post, we will start working with Cortex and Processing Engine. Let`s see what it is and how it works.

Here goes standard processing engine workflow:

  • Extract data from a datasource repository (xdb is supported OOTB)
  • Build data projection (create a tabular data model for ML)
  • Train data (generate train model)
  • Evaluate/Predict data (update some data by passing it through train model)
  • Use predicted data for optimization or updating something (ex. contact facets)

Sitecore cortex  processing engine model

The basic unit of Cortex is the processing engine model. This model consists of 3 parts:

  1. Projection (prepare ML-model)
  2. Train (prepare training model)
  3. Evaluate (predict something by using the training model)

For machine learning, we need first of all a Model. Model is a dataset represented in a tabular structure (like Excel spreadsheet) that we will use as an initial data for all our ML processes.

The first step is the implementation of projection. Projection is responsible for projecting data into a tabular structure (that ML can understand) and storing the results in the Cortex Processing Engine Storage database. Default usages of the projection worker include projecting contact and interaction data from xdb but you can also extend it by implementing your own logic to get data from your custom repository. Projection just extracts data from the provided repository and transforms it into the required tabular view model. Projection is a distributed worker - it means that our data is processed asynchronously in batches.

After projection, the Merge worker must be executed (it is responsible for merging the tables created by the projection worker into a single table). Merge worker is a deferred worker (it means that it processes all of the data at once, not in batches like Projection worker). As a result, after projection and merging, we have final tabular data that we can use for training. Usually, you don't need to have a custom implementation of merge worker, it is enough to register Sitecore OOTB merge worker in a chain of tasks after projection worker.

Projection merge train

Sitecore Cortex model

There are two default Sitecore OOTB datasources that we can use for projection model: Contact and Interaction repositories of xDB. If necessary we can implement our own repository with datasource for projection, not just xdb.

For our scenario we need two models:


  1. A model with all of the information about customer purchases. We will need iterate through interaction repository, extract our custom purchases, transform them into a tabular structure, train it and generate train model.
  2. Second model will be used for evaluation and prediction results. Contact repository will be used as datasource for the second model. We will iterate through the contact repository and predict contact clusters using the train model.

Let`s implement our models and projections (data transformations).

For our first model we need to implement a projection that transforms all customer purchases into the following tabular presentation:

models and projections source

To create a new model, we need to inherit a new class from IModel<IEntity>.

For our first model we use Interaction repository as datasource, where we can retrieve all our custom PurchaseOutcome events. Our model will have the following projection:

 public class PurchaseOutcomeModel: IModel<Interaction>
{
 public PurchaseInteractionModel(IReadOnlyDictionary<string, string> options)
{

this.Projection = Sitecore.Processing.Engine.Projection.Projection.Of<Interaction>()
                .CreateTabular("PurchaseOutcome",
                    interaction => interaction.Events.OfType<PurchaseOutcome>(),
                    cfg => cfg.Key("ID", x => x.Id)
                        .Attribute("InvoiceId", x => x.InvoiceId)
                        .Attribute("Quantity", x => x.Quantity)
                        .Attribute("Timestamp", x => x.Timestamp)
                        .Attribute("UnitPrice", x => (double)x.MonetaryValue)
                        .Attribute("CustomerId", x => x.CustomerId)
                );
}
public IProjection<Interaction> Projection { get; set; }
...
}

Important: IReadOnlyDictionary<string, string> options parameter is required for model constructor. It is used for passing additional data to our model.

Note: decimal type is not supported in projection, and we need to cast it to a double type: Attribute("UnitPrice", x => (double)x.MonetaryValue). Supported field types are:

public enum FieldDataType
  {
    Guid,
    String,
    Int64,
    Double,
    DateTime
  }

Be aware: You can also use Grouping in your projection, for example, if you need data grouped by “Year” (“JobTitle”-”YearsOfExperience”-”Count”). For this you should use “Measure“ property. But be sure that your “Key” property is NOT UNIQUE in the initial dataset. Example:

Sitecore.Processing.Engine.Projection.Projection.Of<Contact>().CreateTabular(
    “Example”,
    Expansion,
    cfg => cfg
        .Key("JobTitle", c => c.Personal().JobTitle)
        .Measure("YearsOfExperience", c=> c.CareerInfo().YearsExperience) // Sample facet
        .Measure("Count", x => 1));

Our second model will be inherited from IModel<Contact>. For this model we need to retrieve RFM facet of our contacts:

public class ContactModel : IModel<Contact>
    {
      public ContactModel(IReadOnlyDictionary<string, string> options)
        {
      Projection = Sitecore.Processing.Engine.Projection.Projection.Of<Contact>().CreateTabular(
                "ContactModel",
                cfg => cfg
                    .Key("ContactId", c => c.Id)
                    .Attribute("Enabled", c => c.GetFacet<RfmContactFacet>()==null ? 0 : 1)
                    .Attribute("R", c => c.GetFacet<RfmContactFacet>()==null ? 0 : c.GetFacet<RfmContactFacet>().R)
                    .Attribute("F", c => c.GetFacet<RfmContactFacet>() == null ? 0 : c.GetFacet<RfmContactFacet>().F)
                    .Attribute("M", c => c.GetFacet<RfmContactFacet>() == null ? 0 : c.GetFacet<RfmContactFacet>().M)
                    .Attribute("Recency", c => c.GetFacet<RfmContactFacet>() == null ? 0 : c.GetFacet<RfmContactFacet>().Recency)
                    .Attribute("Frequency", c => c.GetFacet<RfmContactFacet>() == null ? 0 : c.GetFacet<RfmContactFacet>().Frequency)
                    .Attribute("Monetary", c => c.GetFacet<RfmContactFacet>() == null ? 0 : c.GetFacet<RfmContactFacet>().Monetary)
                   .Attribute("Email", c => c.Emails()?.PreferredEmail?.SmtpAddress, nullable: true));
          
        }

         public IProjection<Contact> Projection { get; }
          … 
    }

NOTE: If attribute value can be ‘null’ in projection you should add "nullable: true" parameter (notice "Email" attribute in the code above), otherwise an error will be thrown.

You must also register Cortex models in processing engine configurations. Add new file “sc.Demo.Models.xml” in “ProcessingEngine\App_Data\Config\Sitecore\Processing\” folder:

<Settings>
  <Sitecore>
    <Processing>
      <Services>
        <PurchaseInteractionModel>
          <Type>Demo.Foundation.ProcessingEngine.Train.Models.PurchaseOutcomeModel, Demo.Foundation.ProcessingEngine</Type>
        </PurchaseInteractionModel>
	   <ContactModel>
          <Type>Demo.Foundation.ProcessingEngine.Predict.Models.ContactModel, Demo.Foundation.ProcessingEngine</Type>
        </ContactModel>
      </Services>
    </Processing>
  </Sitecore>
</Settings>

If you don`t do it, you will catch error:

Sitecore.Processing.Engine.ML.ModelNotFoundInAllowedModelsException
Type not found in allowed models dictionary. Ensure model is registered correctly in configuration.

The second common error with projection:

[Error] Error Executing worker. Task Id: 93804cf1-4ac6-4e35-844d-4b411d0571c6.
Sitecore.Processing.Engine.Exceptions.DeferredWorkerException: Error Running Deferred Worker. ---> System.InvalidOperationException: ITableStore with schemaName ... has no tables starting with prefix ....
   at Sitecore.Processing.Engine.ML.Workers.MergeWorker.<RunAsync>d__5.MoveNext()

This error means that your projection doesn`t populate Processing Engine blob storage database. Just make sure that your projection extract any data from repository (you can do it with testing request to xDB that retrieves the same data).

Be sure to review the following documentation from Sitecore to get a better understanding of the above solution details.

https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/projection-framework.html
https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/model-wrappers.html
https://doc.sitecore.com/developers/91/sitecore-experience-platform/en/projection-worker.html

Table of contents Dive into Sitecore Cortex and Machine Learning - Introduction

Read next Part 4 - Processing Engine Workers, Options dictionary, Agents and Task Manager


Do you need help with your Sitecore project?
VIEW SITECORE SERVICES