MorphMorph

Creating a Business Event Warehouse

This article describes how to implement a data warehouse by subscribing to business events from Quavo and using a combination of AWS tools and Snowflake to consume those events.  Please note that this is only one approach and there may be a different approach that is more appropriate for your organization.

Getting Started

  1. Talk to your Technology partners about the feasibility of consuming events and constructing a data warehouse.  See Creating a Business Event Warehouse for an overview of what this entails.  

  2. Obtain a webhook endpoint that Quavo can use to publish the event stream to.

  3. Review the list of available events and determine which ones you'll subscribe to.

  4. Let your Client Experience (CX) manager know that you'd like to start the process to implement Business Event Streaming.

Business Events

Become a Consumer

To get started with Business Events, take the following steps:

  1. Talk to your Technology partners about the feasibility of consuming events and constructing a data warehouse.

  2. Obtain an endpoint that Quavo can use to publish the event stream to. Separate endpoints should be provided for dev, staging and production.

  3. Review the list of available events and determine which ones you'll subscribe to.

  4. Let your Client Experience (CX) manager now that you'd like to start the process to implement Business Event Streaming.

Once these steps have been completed, your CX Manager will open a project with the Services team. The Services team will configure you as an event consumer within QFD as shown below.

It is recommended that an Authentication Profile is used, however basic API Key Authentication is supported.

Events

Quavo will configure QFD so it knows which events should be tracked and published.

Event Stream

QFD will automatically process event queues and send events to the defined endpoints based on the defined settings.

This will perform a Connect-REST method on the endpoint and auth profile as configured.

This creates Business Event instances.

Requeuing Business Events

This utility provides method to requeue events by TranmissionStatus, Start/End DateTime window, or specific EventId:


AWS

Quavo’s infrastructure to map events to snowflake is supported by a node.js lambda function which maps to a Kinesis Firehose data/delivery stream that ultimately delivers the JSON payload to our Snowflake S3 bucket.

qfd-data-api-private

https://bitbucket.org/quavo-inc/qfd-data-api-private/src/dev/

The qfd-data-api-private project is our project for the postDataEventBusiness lambda function implementation and cloudformation template.

The template.yaml cloud formation template sets up all of the artifacts used by our lambda function (Kinesis Delivery Stream, Kinesis Data Stream, internal IAM roles, Snowflake IAM role). This allows for automatic configuration and minimal manual setup when deploying to staging and production.

postDataEventBusiness lambda function

At this time only postDataEventBusiness is in use. This is the lambda function mapped by the API Gateway when the https://data-api-private{-dev|-stg}.quantumdisputes.com/v1/data/event/ endpoint is called by Pega (as per the Business Event Consumer Rule configuration).

This simple lambda function wraps the AWS.Kinesis library and calls the kinesis.putRecord method to add our JSON payload from the REST request to the S3 files in our snowflake S3 bucket.

S3 Bucket quavo-snowflake-qfd-{dev|stg|prd}

Each event will be loaded into an S3 file under a subdirectory that correlates to the Business Event rule name:

These directories are further qualified by date and ultimately the tenant id from which they originate:

As business events are getting sent to our lambda function, they will get accumulated into a file that typically represents about 10 minutes of records for that event type:

Each individual record is delimited by a newline in the file:

SQS Notification Queue

As these files are completed by the Kinesis delivery stream, a notification is sent to Snowflake to notify it that the file is ready to be consumed.

Snowflake

Snowflake ingests each S3 file as it is notified via the SQS queue. For each event type a stage, snowpipe, staging table, structured final table, merge task, and staging table cleanup task should be set up. A storage integration (quavo_snowflake_qfd_{dev|stg|prod}_s3) and file format (QFD.{DEV|STG|PROD}.JSON_FILE_FORMAT are also created but these are already existing and reused for all event types and will not need to be recreated.

To start you need to be in the "DATAARCHITECT" role for the schema you are working in. For this example we are using QFDDEVDATAARCHITECT. You should be developing and testing initially in the DEV schema before making the same changes in the STG and PROD schemas.

Snowflake Stage

create or replace stage QFD.DEV.ACCOUNTING
	storage_integration = "quavo_snowflake_qfd_dev_s3" 
    url = 's3://quavo-snowflake-qfd-dev/load/Accounting/'
    file_format = QFD.DEV.JSON_FILE_FORMAT;

Staging Table (Unstructured)

This is an unstructured database table that contains a single VARIANT column containing the raw JSON records.

create or replace TABLE QFD.DEV.ACCOUNTING_STG (
	EVENTPAYLOAD VARIANT
);

Snowpipe

create or replace pipe QFD.DEV.ACCOUNTING auto_ingest=true as copy into qfd.dev.accounting_stg
  from @qfd.dev.accounting
  file_format = QFD.DEV.JSON_FILE_FORMAT
  MATCH_BY_COLUMN_NAME = NONE;

Stream on Staging Table

The stream monitors for appended rows in the staging table. This is used in the merge task.

create or replace stream QFD.DEV.ACCOUNTING_STREAM_APPEND_ONLY on table ACCOUNTING_STG append_only = true;


Final Table (structured)

This is the final structured table which will house the data for reporting.

create or replace TABLE QFD.DEV.ACCOUNTING (
	TENANTID VARCHAR(32),
	CLIENTID VARCHAR(64),
	ENTRYIDENTIFIER VARCHAR(64),
	CLAIMID VARCHAR(32),
	DISPUTEID VARCHAR(32),
	PERFORMEDON TIMESTAMP_NTZ(9),
	PERFORMEDBYOPERATORID VARCHAR(128),
	EXECUTEDON TIMESTAMP_NTZ(9),
	EXECUTEDBYOPERATORID VARCHAR(128),
	COLLECTIONNAME VARCHAR(64),
	DEBITCREDIT VARCHAR(16),
	AMOUNT NUMBER(9,2),
	REASON VARCHAR(64),
	EXECUTIONMETHOD VARCHAR(32),
	STEPIDENTIFIER VARCHAR(64),
	EVENTDATETIME TIMESTAMP_NTZ(9),
	EVENTIDENTIFIER VARCHAR(64),
	EVENTITEMID VARCHAR(64),
	primary key (EVENTITEMID, ENTRYIDENTIFIER)
);
GRANT SELECT ON TABLE QFD.DEV.ACCOUNTING to ROLE QFDDEVREADONLY;

Merge Task

This task will merge the unstructured EVENTPAYLOAD data from the staging table into the final structured table.

create or replace task QFD.DEV.ACCOUNTING_MERGE
	schedule='5 MINUTE'
	as merge into QFD.DEV.ACCOUNTING using (
	SELECT 
		EVENTPAYLOAD:TenantId::STRING AS TENANTID,
		EVENTPAYLOAD:ClientId::STRING AS CLIENTID,
		VALUE:EntryIdentifier::STRING AS ENTRYIDENTIFIER,
		EVENTPAYLOAD:ClaimId::STRING AS CLAIMID,
		EVENTPAYLOAD:DisputeId::STRING AS DISPUTEID,
		VALUE:PerformedOn::TIMESTAMP_NTZ AS PERFORMEDON,
		VALUE:PerformedByOperatorID::STRING AS PERFORMEDBYOPERATORID,
		VALUE:ExecutedOn::TIMESTAMP_NTZ AS EXECUTEDON,
		VALUE:ExecutedByOperatorID::STRING AS EXECUTEDBYOPERATORID,
		VALUE:CollectionName::STRING AS COLLECTIONNAME,
		VALUE:DebitCredit::STRING AS DEBITCREDIT,
		VALUE:Amount::NUMBER(9,2) AS AMOUNT,
		VALUE:Reason::STRING AS REASON,
		EVENTPAYLOAD:ExecutionMethod::STRING AS EXECUTIONMETHOD,
		EVENTPAYLOAD:EventDateTime::TIMESTAMP_NTZ AS EVENTDATETIME,
		EVENTPAYLOAD:EventIdentifier::STRING AS EVENTIDENTIFIER,
		EVENTPAYLOAD:EventItemId::STRING AS EVENTITEMID,
		EVENTPAYLOAD:StepIdentifier::STRING AS STEPIDENTIFIER
	FROM QFD.DEV.ACCOUNTING_STREAM_APPEND_ONLY, LATERAL FLATTEN(INPUT => EVENTPAYLOAD:EntryList)
    WHERE ENTRYIDENTIFIER IS NOT NULL
    QUALIFY ROW_NUMBER() OVER (PARTITION BY EVENTITEMID,ENTRYIDENTIFIER ORDER BY EVENTDATETIME DESC) = 1
) stagingTable on QFD.DEV.ACCOUNTING.EVENTITEMID = stagingTable.EVENTITEMID AND QFD.DEV.ACCOUNTING.ENTRYIDENTIFIER = stagingTable.ENTRYIDENTIFIER
when matched and stagingTable.EVENTDATETIME > QFD.DEV.ACCOUNTING.EVENTDATETIME then update set 
    clientid=stagingTable.clientid,
    claimid=stagingTable.claimid,
    disputeid=stagingTable.disputeid,
    performedon=stagingTable.performedon,
    performedbyoperatorid=stagingTable.performedbyoperatorid,
    executedon=stagingTable.executedon,
    executedbyoperatorid=stagingTable.executedbyoperatorid,
    collectionname=stagingTable.collectionname,
    debitcredit=stagingTable.debitcredit,
    amount=stagingTable.amount,
    reason=stagingTable.reason,
    executionmethod=stagingTable.executionmethod,
    stepidentifier=stagingTable.stepidentifier,
	eventdatetime=stagingTable.eventdatetime,
    eventidentifier=stagingTable.eventidentifier
when not matched then insert (
    tenantid,
    clientid,
    entryidentifier,
    claimid,
    disputeid,
    performedon,
    performedbyoperatorid,
    executedon,
    executedbyoperatorid,
    collectionname,
    debitcredit,
    amount,
    reason,
    executionmethod,
    stepidentifier,
	eventdatetime,
    eventidentifier,
	eventitemid
) values (
    stagingTable.tenantid,
    stagingTable.clientid,
    stagingTable.entryidentifier,
    stagingTable.claimid,
    stagingTable.disputeid,
    stagingTable.performedon,
    stagingTable.performedbyoperatorid,
    stagingTable.executedon,
    stagingTable.executedbyoperatorid,
    stagingTable.collectionname,
    stagingTable.debitcredit,
    stagingTable.amount,
    stagingTable.reason,
    stagingTable.executionmethod,
    stagingTable.stepidentifier,
	stagingTable.eventdatetime,
    stagingTable.eventidentifier,
	stagingTable.eventitemid
);

Cleanup Task

This will clean up all unstructured records from the staging table that have been merged into the final structured table. It runs every Sunday at 10 AM UTC (6AM ET). When creating multiple cleanup tasks, space them out by adjusting the CRON hour (10 in this example) so they are not all running at the same time.

create or replace task QFD.DEV.ACCOUNTING_CLEANUP
	schedule='USING CRON 0 10 * * SUN UTC'
	as DELETE FROM QFD.DEV.ACCOUNTING_STG
WHERE EVENTPAYLOAD:EventDateTime::TIMESTAMP_NTZ  <= DATEADD(day, -7, current_timestamp());