This article describes how to implement a data warehouse by subscribing to business events from Quavo and using a combination of AWS tools and Snowflake to consume those events. Please not note that this is only one approach and there may be a different approach that is more appropriate for your organization.
...
It is recommended that an Authentication Profile is used, however basic API Key Authentication is supported.
Events
Quavo will configure QFD so it knows which events should be tracked and published.
Event Stream
QFD will automatically process event queues and send events to the defined endpoints based on the defined settings.
...
This creates Business Event instances.
Requeuing Business Events
This utility provides method to requeue events by TranmissionStatus, Start/End DateTime window, or specific EventId:
...
Snowflake ingests each S3 file as it is notified via the SQS queue. For each event type a stage, snowpipe, staging table, structured final table, merge task, and staging table cleanup task should be set up. A storage integration (quavo_snowflake_qfd_{dev|stg|prod}_s3) and file format (QFD.{DEV|STG|PROD}.JSON_FILE_FORMAT are also created but these are already existing and reused for all event types and will not need to be recreated.
To start you need to be in the "DATAARCHITECT" role for the schema you are working in. For this example we are using QFDDEVDATAARCHITECT. You should be developing and testing initially in the DEV schema before making the same changes in the STG and PROD schemas.
Snowflake Stage
Code Block | ||
---|---|---|
| ||
create or replace stage QFD.DEV.ACCOUNTING storage_integration = "quavo_snowflake_qfd_dev_s3" url = 's3://quavo-snowflake-qfd-dev/load/Accounting/' file_format = QFD.DEV.JSON_FILE_FORMAT; |
Staging Table (Unstructured)
This is an unstructured database table that contains a single VARIANT column containing the raw JSON records.
Code Block | ||
---|---|---|
| ||
create or replace TABLE QFD.DEV.ACCOUNTING_STG (
EVENTPAYLOAD VARIANT
); |
Snowpipe
Code Block | ||
---|---|---|
| ||
create or replace pipe QFD.DEV.ACCOUNTING auto_ingest=true as copy into qfd.dev.accounting_stg from @qfd.dev.accounting file_format = QFD.DEV.JSON_FILE_FORMAT MATCH_BY_COLUMN_NAME = NONE; |
Stream on Staging Table
...
This is an unstructured database table that contains a single VARIANT column containing the raw JSON records.
...
The stream monitors for appended rows in the staging table. This is used in the merge task.
Code Block | ||
---|---|---|
| ||
create or replace TABLEstream QFD.DEV.ACCOUNTING_STREAM_APPEND_ONLY on table ACCOUNTING_STG ( EVENTPAYLOAD VARIANT )append_only = true; |
Final Table (structured)
This is the final structured table which will house the data for reporting.
Code Block | ||
---|---|---|
| ||
create or replace TABLE QFD.DEV.ACCOUNTING (
TENANTID VARCHAR(32),
CLIENTID VARCHAR(64),
ENTRYIDENTIFIER VARCHAR(64),
CLAIMID VARCHAR(32),
DISPUTEID VARCHAR(32),
PERFORMEDON TIMESTAMP_NTZ(9),
PERFORMEDBYOPERATORID VARCHAR(128),
EXECUTEDON TIMESTAMP_NTZ(9),
EXECUTEDBYOPERATORID VARCHAR(128),
COLLECTIONNAME VARCHAR(64),
DEBITCREDIT VARCHAR(16),
AMOUNT NUMBER(9,2),
REASON VARCHAR(64),
EXECUTIONMETHOD VARCHAR(32),
STEPIDENTIFIER VARCHAR(64),
EVENTDATETIME TIMESTAMP_NTZ(9),
EVENTIDENTIFIER VARCHAR(64),
EVENTITEMID VARCHAR(64),
primary key (EVENTITEMID, ENTRYIDENTIFIER)
);
GRANT SELECT ON TABLE QFD.DEV.ACCOUNTING to ROLE QFDDEVREADONLY; |
Merge Task
This task will merge the unstructured EVENTPAYLOAD data from the staging table into the final structured table.
Code Block | ||
---|---|---|
| ||
create or replace task QFD.DEV.ACCOUNTING_MERGE schedule='5 MINUTE' AS as merge into QFD.DEV.ACCOUNTING using ( SELECT EVENTPAYLOAD:TenantId::STRING AS TENANTID, EVENTPAYLOAD:ClientId::STRING AS CLIENTID, VALUE:EntryIdentifier::STRING AS ENTRYIDENTIFIER, EVENTPAYLOAD:ClaimId::STRING AS CLAIMID, EVENTPAYLOAD:DisputeId::STRING AS DISPUTEID, VALUE:PerformedOn::TIMESTAMP_NTZ AS PERFORMEDON, VALUE:PerformedByOperatorID::STRING AS PERFORMEDBYOPERATORID, VALUE:ExecutedOn::TIMESTAMP_NTZ AS EXECUTEDON, VALUE:ExecutedByOperatorID::STRING AS EXECUTEDBYOPERATORID, VALUE:CollectionName::STRING AS COLLECTIONNAME, VALUE:DebitCredit::STRING AS DEBITCREDIT, VALUE:Amount::NUMBER(9,2) AS AMOUNT, VALUE:Reason::STRING AS REASON, EVENTPAYLOAD:ExecutionMethod::STRING AS EXECUTIONMETHOD, EVENTPAYLOAD:EventDateTime::TIMESTAMP_NTZ AS EVENTDATETIME, EVENTPAYLOAD:EventIdentifier::STRING AS EVENTIDENTIFIER, EVENTPAYLOAD:EventItemId::STRING AS EVENTITEMID, EVENTPAYLOAD:StepIdentifier::STRING AS STEPIDENTIFIER FROM QFD.DEV.ACCOUNTING_STGSTREAM_APPEND_ONLY, LATERAL FLATTEN(INPUT => EVENTPAYLOAD:EntryList) WHERE ENTRYIDENTIFIER IS NOT NULL QUALIFY ROW_NUMBER() OVER (PARTITION BY EVENTITEMID,ENTRYIDENTIFIER ORDER BY EVENTDATETIME DESC) = 1 ) stagingTable on QFD.DEV.ACCOUNTING.EVENTITEMID = stagingTable.EVENTITEMID AND QFD.DEV.ACCOUNTING.ENTRYIDENTIFIER = stagingTable.ENTRYIDENTIFIER when matched and stagingTable.EVENTDATETIME > QFD.DEV.ACCOUNTING.EVENTDATETIME then update set clientid=stagingTable.clientid, claimid=stagingTable.claimid, disputeid=stagingTable.disputeid, performedon=stagingTable.performedon, performedbyoperatorid=stagingTable.performedbyoperatorid, executedon=stagingTable.executedon, executedbyoperatorid=stagingTable.executedbyoperatorid, collectionname=stagingTable.collectionname, debitcredit=stagingTable.debitcredit, amount=stagingTable.amount, reason=stagingTable.reason, executionmethod=stagingTable.executionmethod, stepidentifier=stagingTable.stepidentifier, eventdatetime=stagingTable.eventdatetime, eventidentifier=stagingTable.eventidentifier when not matched then insert ( tenantid, clientid, entryidentifier, claimid, disputeid, performedon, performedbyoperatorid, executedon, executedbyoperatorid, collectionname, debitcredit, amount, reason, executionmethod, stepidentifier, eventdatetime, eventidentifier, eventitemid ) values ( stagingTable.tenantid, stagingTable.clientid, stagingTable.entryidentifier, stagingTable.claimid, stagingTable.disputeid, stagingTable.performedon, stagingTable.performedbyoperatorid, stagingTable.executedon, stagingTable.executedbyoperatorid, stagingTable.collectionname, stagingTable.debitcredit, stagingTable.amount, stagingTable.reason, stagingTable.executionmethod, stagingTable.stepidentifier, stagingTable.eventdatetime, stagingTable.eventidentifier, stagingTable.eventitemid ); |
Cleanup Task
This will clean up all unstructured records from the staging table that have been merged into the final structured table. It runs every Sunday at 10 AM UTC (6AM ET). When creating multiple cleanup tasks, space them out by adjusting the CRON hour (10 in this example) so they are not all running at the same time.
Code Block | |||
---|---|---|---|
| CREATE
| ||
create ORor REPLACEreplace TASK accounting_cleanup AFTER accounting_merge AS task QFD.DEV.ACCOUNTING_CLEANUP schedule='USING CRON 0 10 * * SUN UTC' as DELETE FROM QFD.DEV.ACCOUNTING_STG as stagingTable USING QFD.DEV.ACCOUNTING as primaryTable WHERE stagingTable.EVENTPAYLOAD:EventItemIdEventDateTime::STRINGTIMESTAMP_NTZ <= primaryTable.EVENTITEMID; When both tasks have been defined they need to be started: ALTER TASK accounting_cleanup RESUME; ALTER TASK accounting_merge RESUMEDATEADD(day, -7, current_timestamp()); |