A follow up from my previous post on Detecting ETL Duration Anomalies with Simple Linear Regression, I thought I would share how to detect Row Count Anomalies with Simple Linear Regression.
This is very useful to detect when tables row counts all of a sudden grow by an abnormal amount, or are flushed unexpectedly.
You can use the scripts to create the user-defined data type, table valued function and failure log table from my previous post here.
Once you have run the above scripts you can move on to creating the row count history table that will store a history of the row counts.
CREATE TABLE [tests].[TableRecCountHist](
[TableRecCountHistoryID] [int] IDENTITY(1,1) NOT NULL,
[TableTestDate] [date] NOT NULL,
[TableTestTime] [time](7) NOT NULL,
[TableName] [varchar](100) NOT NULL,
[TableRecCount] [int] NOT NULL,
[IsError] [bit] NULL,
[Sensitivity] [decimal](18, 3) NULL,
[CurrCntSigma] [decimal](18, 5) NULL
) ON [PRIMARY]
Next you will create the stored procedure to detect the row count anomalies. This procedure will insert into the history table to keep track of the table row counts in order to work out whether there is an anomaly or not.
CREATE PROC [tests].[tst_TableRowCount]
AS
BEGIN
SET NOCOUNT ON;
/*----------------------------------------------
* Uncomment for debug
------------------------------------------------*/
--IF OBJECT_ID('tempdb..#PreOp') IS NOT NULL DROP TABLE #PreOp
--IF OBJECT_ID('tempdb..#tReport') IS NOT NULL DROP TABLE #tReport
/*----------------------------------------------
* Tunable parameters
------------------------------------------------*/
DECLARE @nDaysHist SMALLINT = 90, -- Set day count to limit history retention
@nSampleMAX SMALLINT = 30, -- Set maximum history count to evalute using simple linear regression
@nSampleMIN SMALLINT = 3, -- Set minimum history count to evalute using simple linear regression
@STDevSensitivity SMALLINT = 6 -- Set a value to increase or decrease the upper and lower threshold
/*-----------------------------------------------
* List tables to exclude
-------------------------------------------------*/
DECLARE @tExceptions TABLE (
FullTableName varchar(200) NOT NULL
)
INSERT INTO @tExceptions
SELECT 'Staging.ops.Address'
UNION
SELECT 'TestDWH.dbo.Address'
/*-----------------------------------------------------------------------------------------------------------------------------------
FLUSH
* Clear history table of records older than @nDaysHist days or records added on current day to get latest stats
*/-----------------------------------------------------------------------------------------------------------------------------------
DELETE
FROM tests.TableRecCountHist
WHERE TableTestDate < DATEADD(DAY, -@nDaysHist, GETDATE())
;
/*-----------------------------------------------------------------------------------------------------------------------------------
GET COUNTS
* Get table record count for all tables from [Staging], [TestDWH] and [BelparkDW] databases
* Counts will be recorded per table, no matter how many times they have been recorded for that day already
*/-----------------------------------------------------------------------------------------------------------------------------------
;WITH TABcte ([object_id], name, database_name, SCHEMA_ID) AS
(
SELECT *
FROM
(
SELECT [object_id], name, database_name = 'Staging', SCHEMA_ID FROM [Staging].sys.tables
UNION ALL
SELECT [object_id], name, database_name = 'TestDWH', SCHEMA_ID FROM [TestDWH].sys.tables
)DT
)
,SCHcte AS
(
SELECT database_name = 'Staging', [schema_id], name FROM [Staging].sys.schemas
UNION ALL
SELECT database_name = 'TestDWH', [schema_id], name FROM [TestDWH].sys.schemas
)
,DPScte AS
(
SELECT database_name = 'Staging', [object_id], index_id, row_count FROM [Staging].sys.dm_db_partition_stats
UNION ALL
SELECT database_name = 'TestDWH',[object_id], index_id, row_count FROM [TestDWH].sys.dm_db_partition_stats
)
,TRCcte AS
(
SELECT TableID = TAB.[object_id]
,TableTestDate = CAST(GETDATE() AS DATE)
,TableTestTime = CAST(GETDATE() AS TIME)
,FullName = CONCAT(TAB.database_name, '.', SCH.name, '.', TAB.name)
,TableRecCount = ISNULL(RC.RCount,0)
FROM TABcte TAB
JOIN SCHcte SCH ON TAB.[schema_id] = SCH.[schema_id]
AND TAB.database_name = SCH.database_name
CROSS
APPLY
(
SELECT RCount = SUM (row_count)
FROM DPScte
WHERE [object_id] = TAB.[object_id]
AND (index_id = 0 OR index_id = 1)
AND DPScte.database_name = TAB.database_name
) RC
WHERE TAB.name != '__RefactorLog'
AND TAB.name != 'sysdiagrams'
AND SCH.name NOT IN ('tests','ops','errorlog') --Add Schema names of schemas you do not want tested.
)
INSERT INTO tests.TableRecCountHist (TableTestDate, TableTestTime, TableName, TableRecCount)
SELECT DISTINCT
TableTestDate
,TableTestTime
,FullName
,TableRecCount
FROM TRCcte
/*-----------------------------------------------------------------------------------------------------------------------------------
MAGIC
* Using Simple Linear Regression, issue an estimation for what the row count per table should be "today"
* Based on the regression line, define thresholds that will determine alert
*/-----------------------------------------------------------------------------------------------------------------------------------
SELECT TableName
,X = DATEDIFF(DAY, MIN(TableTestDate) OVER (PARTITION BY TableName), TableTestDate) + 1
,TableTestDate
,Y = TableRecCount
,SampleOrdinal = ROW_NUMBER() OVER (PARTITION BY TableName ORDER BY TableTestDate DESC)
,MeasurementsPerTable = COUNT(*) OVER (PARTITION BY TableName)
INTO #PreOp
FROM
(
SELECT [TableRecCountHistoryID]
,[TableTestDate]
,[TableTestTime]
,trch.[TableName]
,[TableRecCount]
,RecentPerDay = ROW_NUMBER() OVER (PARTITION BY trch.TableName, trch.TableTestDate ORDER BY trch.TableTestTime DESC)
FROM tests.TableRecCountHist trch
WHERE trch.IsError IS NULL
AND trch.TableTestDate < CAST(GETDATE() AS DATE)
AND NOT EXISTS (
SELECT *
FROM @tExceptions ex
WHERE trch.TableName = ex.FullTableName
)
)DT
WHERE RecentPerDay = 1
DECLARE @tXYData XYDataPoints
INSERT INTO @tXYData(GroupID,X,Y)
SELECT TableName
,X
,Y
FROM #PreOp
WHERE SampleOrdinal <= @nSampleMAX AND MeasurementsPerTable >= @nSampleMIN
/*-----------------------------------------------
* Table to use for updating the error log
-----------------------------------------------*/
DECLARE @ErrorOutput TABLE (
FullTableName varchar(200) NOT NULL
)
;WITH Pruned AS
(
SELECT TableName
,X
,TableTestDate
,Y
,SampleOrdinal
,MeasurementsPerTable
FROM #PreOp pO
WHERE EXISTS (
SELECT *
FROM @tXYData tD
WHERE tD.GroupID = pO.TableName
)
)
,FNResponse AS
(
SELECT GroupID
,Slope
,Intercept
,StDevError
FROM dbo.fn_GetSimpleLinearRegression(@tXYData)
)
,TodayXCTE AS
(
SELECT TableName
,X
,TableTestDate
,Y
,SampleOrdinal
,MeasurementsPerTable
,GroupID
,Slope
,Intercept
,StDevError
,PredictedY = ( Slope * (
DATEDIFF(DAY, MIN(TableTestDate) OVER (PARTITION BY TableName), GETDATE()) + 1
)
) + Intercept
FROM Pruned P
JOIN FNResponse F ON P.TableName = F.GroupID
)
,ThresholdCTE AS
(
SELECT DISTINCT
TableName
,PredictedY
,Y
,UpperThreshold = PredictedY + (@STDevSensitivity * STDevError)
,LowerThreshold = PredictedY - (@STDevSensitivity * STDevError)
,StDevError
FROM TodayXCTE
WHERE SampleOrdinal = 1
)
,TblRecCntHistCTE AS
(
SELECT TableName
,TableTestDate
,TableRecCount
,IsError
,Sensitivity
,CurrCntSigma
,RecentPerDay = ROW_NUMBER() OVER (PARTITION BY trch.TableName, trch.TableTestDate ORDER BY trch.TableTestTime DESC)
FROM tests.TableRecCountHist trch
)
/*-----------------------------------------------
* Update only most recent counts in history
table
-----------------------------------------------*/
UPDATE trch
SET IsError = 1
,Sensitivity = @STDevSensitivity
,CurrCntSigma = CEILING(ABS(trch.TableRecCount - ths.PredictedY) / NULLIF(ths.StDevError,0))
/*-----------------------------------------------
* Output updated values to table variable
for later use
-----------------------------------------------*/
OUTPUT INSERTED.TableName
INTO @ErrorOutput
FROM TblRecCntHistCTE trch
LEFT
JOIN ThresholdCTE ths ON trch.TableName = ths.TableName
WHERE trch.RecentPerDay = 1
AND trch.TableTestDate = CAST(GETDATE() AS DATE)
AND trch.IsError IS NULL
AND (
(trch.TableRecCount NOT BETWEEN ths.LowerThreshold AND ths.UpperThreshold)
OR trch.TableRecCount = 0
)
/*-----------------------------------------------
* Log entry to failure log
-----------------------------------------------*/
INSERT INTO tests.TestFailureLog (TestDate, TestTime, TestCollection, TestName)
SELECT TestDate = CAST(GETDATE() AS DATE)
,TestTime = CAST(GETDATE() AS TIME)
,TestCollection = 'TableRowCount'
,FullTableName
FROM @ErrorOutput EO
WHERE NOT EXISTS (
SELECT *
FROM tests.TestFailureLog TL
WHERE TL.TestDate = CAST(GETDATE() AS DATE)
AND TL.TestCollection = 'TableRowCount'
AND TL.TestName = EO.FullTableName
)
END
GO
If you have multiple test cases in a test schema you can create another proc that will execute all the procs in a specific schema. Then you can create a job that executes this procedure and have it email out the results of the failure log table every day.
CREATE PROC [tests].[RunTestSuite]
@Debug INT = 0
AS
BEGIN
DECLARE @i SMALLINT = 0,
@DynamicSQL NVARCHAR(500) = '',
@CurrentLogID INT = 0
CREATE TABLE #TestsToRun
(
TestOrdinal SMALLINT,
TestProcName VARCHAR(50)
)
INSERT INTO #TestsToRun
(
TestOrdinal,
TestProcName
)
SELECT TestOrdinal = ROW_NUMBER() OVER (ORDER BY ROUTINE_NAME)
,TestProcName = CONCAT(ROUTINE_SCHEMA, '.', ROUTINE_NAME)
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_SCHEMA = 'tests'
AND ROUTINE_NAME LIKE 'tst_%'
SET @i = @@ROWCOUNT
SET @CurrentLogID = IDENT_CURRENT('tests.TestFailureLog')
WHILE @i > 0
BEGIN
SELECT @DynamicSQL = CONCAT('EXECUTE ', TestProcName)
FROM #TestsToRun
WHERE TestOrdinal = @i
IF @Debug = 1
BEGIN
PRINT @DynamicSQL
END ELSE
BEGIN
EXECUTE sp_executesql @DynamicSQL
END
SET @i = @i - 1;
END
SELECT TestFailureLogID
,TestDate
,TestTime
,TestCollection
,TestName
FROM tests.TestFailureLog
WHERE TestFailureLogID > @CurrentLogID
END
A special mention to fellow colleagues that worked on this code as well, Mark Diamond, Ivor Williams and Janlo Du Toit.
Hope this was as useful for you as it was for us! You can download the script file for this post here.
