Detecting Row Count Anomalies with Simple Linear Regression

A follow up from my previous post on Detecting ETL Duration Anomalies with Simple Linear Regression, I thought I would share how to detect Row Count Anomalies with Simple Linear Regression.

This is very useful to detect when tables row counts all of a sudden grow by an abnormal amount, or are flushed unexpectedly.

You can use the scripts to create the user-defined data type, table valued function and failure log table from my previous post here.

Once you have run the above scripts you can move on to creating the row count history table that will store a history of the row counts.

CREATE TABLE [tests].[TableRecCountHist](
	[TableRecCountHistoryID] [int] IDENTITY(1,1) NOT NULL,
	[TableTestDate] [date] NOT NULL,
	[TableTestTime] [time](7) NOT NULL,
	[TableName] [varchar](100) NOT NULL,
	[TableRecCount] [int] NOT NULL,
	[IsError] [bit] NULL,
	[Sensitivity] [decimal](18, 3) NULL,
	[CurrCntSigma] [decimal](18, 5) NULL
) ON [PRIMARY]

Next you will create the stored procedure to detect the row count anomalies. This procedure will insert into the history table to keep track of the table row counts in order to work out whether there is an anomaly or not.

CREATE PROC [tests].[tst_TableRowCount]
AS
BEGIN
	SET NOCOUNT ON;
	/*----------------------------------------------	
	  * Uncomment for debug
	------------------------------------------------*/
	 --IF OBJECT_ID('tempdb..#PreOp') IS NOT NULL DROP TABLE #PreOp
	 --IF OBJECT_ID('tempdb..#tReport') IS NOT NULL DROP TABLE #tReport

	/*----------------------------------------------
	  * Tunable parameters
	------------------------------------------------*/
	DECLARE @nDaysHist			SMALLINT	= 90,	-- Set day count to limit history retention 
			@nSampleMAX			SMALLINT	= 30,	-- Set maximum history count to evalute using simple linear regression
			@nSampleMIN			SMALLINT	= 3,	-- Set minimum history count to evalute using simple linear regression
			@STDevSensitivity	SMALLINT	= 6		-- Set a value to increase or decrease the upper and lower threshold

	/*-----------------------------------------------
	  *	List tables to exclude
	-------------------------------------------------*/
	DECLARE @tExceptions TABLE	(
									FullTableName varchar(200) NOT NULL
								)

	INSERT INTO @tExceptions
	SELECT 'Staging.ops.Address' 
	UNION
	SELECT 'TestDWH.dbo.Address'

	/*-----------------------------------------------------------------------------------------------------------------------------------
	  FLUSH                       
		* Clear history table of records older than @nDaysHist days or records added on current day to get latest stats
	*/-----------------------------------------------------------------------------------------------------------------------------------
	DELETE
	FROM tests.TableRecCountHist
	WHERE TableTestDate < DATEADD(DAY, -@nDaysHist, GETDATE()) 
	;

	/*-----------------------------------------------------------------------------------------------------------------------------------
	  GET COUNTS                                             
		* Get table record count for all tables from [Staging], [TestDWH] and [BelparkDW] databases
		* Counts will be recorded per table, no matter how many times they have been recorded for that day already
	*/-----------------------------------------------------------------------------------------------------------------------------------
	;WITH TABcte ([object_id], name, database_name, SCHEMA_ID) AS
	(
		SELECT *
		FROM
		(
			SELECT [object_id], name, database_name = 'Staging', SCHEMA_ID FROM [Staging].sys.tables
			UNION ALL
			SELECT [object_id], name, database_name = 'TestDWH',  SCHEMA_ID FROM [TestDWH].sys.tables 
		)DT
	)
	,SCHcte AS
	(
		SELECT database_name = 'Staging', [schema_id], name FROM [Staging].sys.schemas 
		UNION ALL
		SELECT database_name = 'TestDWH',	[schema_id], name FROM [TestDWH].sys.schemas
	)
	,DPScte AS
	(
		SELECT database_name = 'Staging', [object_id], index_id, row_count FROM [Staging].sys.dm_db_partition_stats 
		UNION ALL
		SELECT database_name = 'TestDWH',[object_id], index_id, row_count FROM [TestDWH].sys.dm_db_partition_stats
	)
	,TRCcte AS
	(
		SELECT	 TableID		= TAB.[object_id]
				,TableTestDate	= CAST(GETDATE() AS DATE) 
				,TableTestTime	= CAST(GETDATE() AS TIME)	
				,FullName		= CONCAT(TAB.database_name, '.', SCH.name, '.', TAB.name)
				,TableRecCount	= ISNULL(RC.RCount,0) 
		FROM TABcte		TAB
		JOIN SCHcte		SCH		ON	TAB.[schema_id] = SCH.[schema_id]
								AND	TAB.database_name = SCH.database_name
		CROSS 
		APPLY
		(
			SELECT RCount = SUM (row_count)
			FROM DPScte
			WHERE [object_id] = TAB.[object_id]
			AND (index_id = 0 OR index_id = 1)
			AND DPScte.database_name = TAB.database_name
		)				RC
		WHERE TAB.name != '__RefactorLog'
		AND TAB.name != 'sysdiagrams'
		AND SCH.name NOT IN ('tests','ops','errorlog') --Add Schema names of schemas you do not want tested.
	)

	INSERT INTO tests.TableRecCountHist (TableTestDate, TableTestTime, TableName, TableRecCount)
	SELECT DISTINCT
			 TableTestDate
			,TableTestTime
			,FullName
			,TableRecCount
	FROM TRCcte	

	/*-----------------------------------------------------------------------------------------------------------------------------------
	  MAGIC
		* Using Simple Linear Regression, issue an estimation for what the row count per table should be "today"
		* Based on the regression line, define thresholds that will determine alert
	*/-----------------------------------------------------------------------------------------------------------------------------------
	SELECT	 TableName
			,X						= DATEDIFF(DAY, MIN(TableTestDate) OVER (PARTITION BY TableName), TableTestDate) + 1
			,TableTestDate
			,Y						= TableRecCount
			,SampleOrdinal			= ROW_NUMBER() OVER (PARTITION BY TableName ORDER BY TableTestDate DESC) 
			,MeasurementsPerTable	= COUNT(*) OVER (PARTITION BY TableName)
	INTO #PreOp
	FROM
	(
		SELECT	 [TableRecCountHistoryID]
				,[TableTestDate]
				,[TableTestTime]
				,trch.[TableName]
				,[TableRecCount]
				,RecentPerDay	= ROW_NUMBER() OVER (PARTITION BY trch.TableName, trch.TableTestDate ORDER BY trch.TableTestTime DESC)
		FROM tests.TableRecCountHist	trch
		WHERE trch.IsError IS NULL
		AND trch.TableTestDate < CAST(GETDATE() AS DATE)
		AND NOT EXISTS	(
							SELECT *
							FROM @tExceptions ex
							WHERE trch.TableName = ex.FullTableName 
						)
	)DT
	WHERE RecentPerDay = 1

	DECLARE @tXYData XYDataPoints
	INSERT INTO @tXYData(GroupID,X,Y)
	SELECT	 TableName
			,X
			,Y
	FROM #PreOp
	WHERE SampleOrdinal <= @nSampleMAX AND MeasurementsPerTable >= @nSampleMIN

	/*-----------------------------------------------
	  * Table to use for updating the error log
	-----------------------------------------------*/
	DECLARE @ErrorOutput TABLE	( 
									FullTableName varchar(200) NOT NULL
								)

	;WITH Pruned AS
	(
		SELECT	 TableName
				,X						
				,TableTestDate
				,Y						
				,SampleOrdinal			
				,MeasurementsPerTable	
		FROM #PreOp pO
		WHERE	EXISTS	(
							SELECT *
							FROM @tXYData	tD
							WHERE tD.GroupID = pO.TableName	
						)
	)
	,FNResponse AS
	(
		SELECT	 GroupID				
				,Slope				
				,Intercept			
				,StDevError			
		FROM dbo.fn_GetSimpleLinearRegression(@tXYData)
	)
	,TodayXCTE AS
	(
		SELECT	 TableName
				,X						
				,TableTestDate
				,Y						
				,SampleOrdinal			
				,MeasurementsPerTable
				,GroupID				
				,Slope				
				,Intercept			
				,StDevError		
				,PredictedY	=	(	Slope * (
												DATEDIFF(DAY, MIN(TableTestDate) OVER (PARTITION BY TableName), GETDATE()) + 1
											)
								) + Intercept
		FROM Pruned		P
		JOIN FNResponse	F	ON P.TableName = F.GroupID
	)
	,ThresholdCTE AS
	(
		SELECT DISTINCT
				 TableName
				,PredictedY
				,Y
				,UpperThreshold	= PredictedY + (@STDevSensitivity * STDevError)
				,LowerThreshold	= PredictedY - (@STDevSensitivity * STDevError)
				,StDevError
		FROM TodayXCTE
		WHERE SampleOrdinal = 1
	)
	,TblRecCntHistCTE AS
	(
		SELECT	 TableName
				,TableTestDate
				,TableRecCount
				,IsError
				,Sensitivity
				,CurrCntSigma
				,RecentPerDay	= ROW_NUMBER() OVER (PARTITION BY trch.TableName, trch.TableTestDate ORDER BY trch.TableTestTime DESC)
		FROM tests.TableRecCountHist	trch
	)
	/*-----------------------------------------------
	  * Update only most recent counts in history 
		table
	-----------------------------------------------*/
	UPDATE trch
	SET  IsError		= 1
		,Sensitivity	= @STDevSensitivity
		,CurrCntSigma	= CEILING(ABS(trch.TableRecCount - ths.PredictedY) / NULLIF(ths.StDevError,0))
	/*-----------------------------------------------
	  * Output updated values to table variable 
		for later use 
	-----------------------------------------------*/
	OUTPUT	INSERTED.TableName
	INTO	@ErrorOutput

	FROM TblRecCntHistCTE	trch
	LEFT
	JOIN ThresholdCTE		ths	ON trch.TableName = ths.TableName
	WHERE	trch.RecentPerDay = 1
		AND trch.TableTestDate = CAST(GETDATE() AS DATE)
		AND trch.IsError IS NULL
		AND (
				(trch.TableRecCount NOT BETWEEN ths.LowerThreshold AND ths.UpperThreshold) 
				OR trch.TableRecCount = 0
			)

	/*-----------------------------------------------
	  * Log entry to failure log
	-----------------------------------------------*/
	INSERT INTO tests.TestFailureLog (TestDate, TestTime, TestCollection, TestName)		
	SELECT	 TestDate			= CAST(GETDATE() AS DATE)
			,TestTime			= CAST(GETDATE() AS TIME)
			,TestCollection		= 'TableRowCount'
			,FullTableName
	FROM @ErrorOutput	EO
	WHERE NOT EXISTS	(
							SELECT *
							FROM tests.TestFailureLog	TL
							WHERE TL.TestDate		= CAST(GETDATE() AS DATE) 
							AND TL.TestCollection	= 'TableRowCount'
							AND TL.TestName			= EO.FullTableName
						)
END
GO

If you have multiple test cases in a test schema you can create another proc that will execute all the procs in a specific schema. Then you can create a job that executes this procedure and have it email out the results of the failure log table every day.

CREATE PROC [tests].[RunTestSuite] 
	@Debug INT = 0
AS
BEGIN
	DECLARE @i				SMALLINT		= 0,
			@DynamicSQL		NVARCHAR(500)	= '',
			@CurrentLogID	INT				= 0

	CREATE TABLE #TestsToRun
	(
		TestOrdinal		SMALLINT,
		TestProcName	VARCHAR(50)	
	)

	INSERT INTO #TestsToRun
	(
		TestOrdinal,
		TestProcName
	)
	SELECT	 TestOrdinal	= ROW_NUMBER() OVER (ORDER BY  ROUTINE_NAME)
			,TestProcName	= CONCAT(ROUTINE_SCHEMA, '.', ROUTINE_NAME)
	FROM INFORMATION_SCHEMA.ROUTINES	
	WHERE ROUTINE_SCHEMA = 'tests'
	AND ROUTINE_NAME LIKE 'tst_%'

	SET @i = @@ROWCOUNT

	SET @CurrentLogID = IDENT_CURRENT('tests.TestFailureLog')

	WHILE @i > 0
	BEGIN
		SELECT @DynamicSQL = CONCAT('EXECUTE ', TestProcName)
		FROM #TestsToRun 
		WHERE TestOrdinal = @i		
		
		IF @Debug = 1 
		BEGIN
			PRINT @DynamicSQL
		END ELSE
		BEGIN
			EXECUTE sp_executesql @DynamicSQL
		END
	
		SET @i = @i - 1;
	END

	

	SELECT	 TestFailureLogID
			,TestDate
			,TestTime
			,TestCollection
			,TestName
	FROM tests.TestFailureLog
	WHERE TestFailureLogID > @CurrentLogID

END

A special mention to fellow colleagues that worked on this code as well, Mark Diamond, Ivor Williams and Janlo Du Toit.

Hope this was as useful for you as it was for us! You can download the script file for this post here.

tumblr_n1mh7i9xZq1rqvcu2o1_400

Detecting ETL Duration Anomalies with Simple Linear Regression

Thought I would share a piece on detecting ETL duration anomalies using Simple Linear Regression. This post will give you a brief overview of what simple linear regression is and how you can use it to your advantage with testing. In this post I will be demonstrating how to use Simple Linear Regression to detect ETL duration Anomalies.

What happens with ETLs is that they normally run for a set amount of time, give or take a few minutes every time. Then all of a sudden it starts running extremely long. If you have nothing in place to check this, other than looking at it manually each day, then this could be your solution.

Let me begin by explaining what Simple Linear Regression is.

The Simple Linear Regression model is used in statistics, where a straight line is plotted through a set of points on a graph. It is plotted in such a way that makes the sum of squared residuals of the model as small as possible.

In order to plot the line on the graph you need to know where the line intercepts on the y axis and the slope of the line. You will also need to set the standard deviation to detect when something is in fact an anomaly. See Figure below.
LR

Step 1:
To start you will need to create a user-defined data type to accept the values of the X and Y data points.

CREATE TYPE dbo.XYDataPoints AS TABLE
(
	GroupID VARCHAR(200) NOT NULL,
	X FLOAT NULL,
	Y FLOAT NULL
)

Step 2:
You need to create a table-valued function which will calculate the simple linear regression of the ETL’s durations.

CREATE FUNCTION [dbo].[fn_GetSimpleLinearRegression] (@MyInput AS XYDataPoints READONLY)

RETURNS @MyOutput TABLE 
(
	-- Columns returned by the function
	 GroupID			VARCHAR(200)
	,Slope				FLOAT
	,Intercept			FLOAT
	,StDevError			DECIMAL(18,5)
)
AS 

BEGIN

	;WITH BASE AS -- Check for NULL source
	(	
		SELECT	 GroupID
				,X
				,Y
		FROM @MyInput
	)
	,XYBAR AS
	(
		SELECT	GroupID
				,Y
				,yBar = AVG(Y) OVER(PARTITION BY GroupID ORDER BY X ROWS BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING)
				,X
				,xBar = AVG(X) OVER(PARTITION BY GroupID ORDER BY X ROWS BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING)
				,xCount	= COUNT(X) OVER(PARTITION BY GroupID ORDER BY X ROWS BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING)
		FROM BASE
	)
	,SLOPE AS
	(
		SELECT	GroupID
				,Slope = SUM((X - xBar) * (Y - yBar)) / SUM(POWER(X - xBar,2))
				,MAX(yBar) AS yBar
				,MAX(xBar) AS xBar
				,MAX(xCount) AS xCount
		FROM XYBAR
		GROUP BY GroupID
	)
	,INTERCEPT AS
	(
		SELECT	 GroupID
				,Slope
				,xBar
				,yBar
				,xCount
				,Intercept = yBar - xBar * Slope
		FROM SLOPE
	)
	INSERT @MyOutput
	SELECT	 INTERCEPT.GroupID
			,Slope
			,Intercept
			,StDevError = STDEV	(	
									ABS(y - ((Slope * (x)) + Intercept))
								)
	FROM BASE
	JOIN INTERCEPT ON BASE.GroupID = INTERCEPT.GroupID
	GROUP BY Slope,Intercept, INTERCEPT.GroupID

	RETURN
END

Step 3:
You need to create a table to contain a history of the duration of ETLs. I chose to create a tests schema and to create all my objects for this detection under the tests schema.

CREATE TABLE [tests].[ETLLogDurationHistory](
	[ETLLogDurationHistoryID] [int] IDENTITY(1,1) NOT NULL,
	[ETLName] [varchar](250) NOT NULL,
	[DateRun] [date] NOT NULL,
	[TimeRun] [time](7) NOT NULL,
	[Duration] [time](0) NOT NULL,
	[DurationInt] [int] NOT NULL,
	[ETLOutcome] [varchar](250) NOT NULL,
	[IsError] [bit] NULL,
	[Sensitivity] [decimal](18, 3) NULL,
	[CurrCntSigma] [decimal](18, 5) NULL,
 CONSTRAINT [PK_ETLLogDurationHistory] PRIMARY KEY CLUSTERED 
(
	[ETLLogDurationHistoryID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

Step 4:
You need to create a table to store the error details that were detected for the ETL duration anomalies.

CREATE TABLE [tests].[TestFailureLog](
	[TestFailureLogID] [int] IDENTITY(1,1) NOT NULL,
	[TestDate] [date] NOT NULL,
	[TestTime] [time](7) NOT NULL,
	[TestCollection] [varchar](100) NOT NULL,
	[TestName] [varchar](300) NOT NULL,
 CONSTRAINT [PK_TestFailureLog] PRIMARY KEY CLUSTERED 
(
	[TestFailureLogID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

Step 5:
Create the proc to detect the ETL duration anomalies. This proc will also insert into the history table to keep track of how long ETLs ran for in order to work out whether there is an anomaly or not.

CREATE PROC [tests].[tst_ETLDurationAnomaly] 
AS
BEGIN
	SET NOCOUNT ON;
	/*----------------------------------------------	
	  * Uncomment for debug
	------------------------------------------------*/
	 --IF OBJECT_ID('tempdb..#PreOp') IS NOT NULL DROP TABLE #PreOp
	 --IF OBJECT_ID('tempdb..#tReport') IS NOT NULL DROP TABLE #tReport

	/*----------------------------------------------
	  * Tunable parameters
	------------------------------------------------*/
	DECLARE @nDaysHist			SMALLINT	= 90,	-- Set day count to limit history retention 
			@nSampleMAX			SMALLINT	= 30,	-- Set maximum history count to evalute using simple linear regression
			@nSampleMIN			SMALLINT	= 3,	-- Set minimum history count to evalute using simple linear regression
			@STDevSensitivity	SMALLINT	= 6		-- Set a value to increase or decrease the upper and lower threshold
			
	/*-----------------------------------------------------------------------------------------------------------------------------------
		* FLUSH TABLES                    
		* Clear history table of records older than @nDaysHist days or records added on current day to get latest stats
	-----------------------------------------------------------------------------------------------------------------------------------*/
	DELETE
	FROM tests.ETLLogDurationHistory
	WHERE DateRun < DATEADD(DAY, -@nDaysHist, GETDATE()) ; 
        /*----------------------------------------------------------------------------------------------------------------------------------- 
                * Get duration for all ETLs * Duration will be recorded per ETL, no matter how many times they have been recorded for that 
                  day already 
        -----------------------------------------------------------------------------------------------------------------------------------*/ 
        INSERT INTO [tests].[ETLLogDurationHistory] (ETLName,DateRun,TimeRun,Duration,DurationInt,ETLOutcome)
	SELECT	  ETLName	=	name
		 ,DateRun	=	CAST(CAST(CONVERT(DATE, CONVERT(VARCHAR(10), run_date)) AS VARCHAR(10)) AS DATE)
		 ,TimeRun	=	CAST(CAST(CONVERT(VARCHAR(8), DATEADD(SECOND, run_time, ''), 114) AS VARCHAR(10)) AS TIME(0))
		 ,Duration	=	CAST(CONVERT(CHAR(8), DATEADD(SECOND, run_duration, ''), 114)  AS TIME(0))
		 ,DurationInt	=	run_duration
		 ,ETLOutcome	=	CASE 
						WHEN SJH.run_status=0 THEN 'Failed'
						WHEN SJH.run_status=1 THEN 'Succeeded'
						WHEN SJH.run_status=2 THEN 'Retry'
						WHEN SJH.run_status=3 THEN 'Cancelled'
						ELSE 'Unknown'  
						END 
	FROM MSDB.dbo.sysjobhistory	SJH  
	JOIN MSDB.dbo.sysjobs		SJ ON SJH.job_id = SJ.job_id  
	WHERE step_id = 0  
	AND	name NOT IN (
				'collection_set_1_noncached_collect_and_upload'
				,'collection_set_2_upload'
				,'collection_set_3_upload'
				,'mdw_purge_data_[BI_MDW]'
				,'SSIS Server Maintenance Job'
				,'sysutility_get_cache_tables_data_into_aggregate_tables_hourly'
				,'sysutility_get_views_data_into_cache_tables'
				,'syspolicy_purge_history'
				,'sysutility_get_cache_tables_data_into_aggregate_tables_daily'
			    )
	AND	CAST(CONVERT(DATE, CONVERT(VARCHAR(10), run_date)) AS VARCHAR(10)) + ' ' 
		+ CAST(CONVERT(VARCHAR(8), DATEADD(SECOND, run_time, ''), 114) AS VARCHAR(10)) >= DATEADD(D,-1,GetDate())  
	ORDER BY name,run_date,run_time  

	/*-----------------------------------------------------------------------------------------------------------------------------------
	    * MAGIC
		* Using Simple Linear Regression, issue an estimation for what the Duration per ETL should be "today"
		* Based on the regression line, define thresholds that will determine alert

	*/-----------------------------------------------------------------------------------------------------------------------------------
	SELECT	 ETLName
			,X						= DATEDIFF(DAY, MIN(DateRun) OVER (PARTITION BY ETLName), DateRun) + 1
			,DateRun
			,Y						= DurationInt
			,SampleOrdinal			= ROW_NUMBER() OVER (PARTITION BY ETLName ORDER BY DateRun DESC) 
			,MeasurementsPerTable	= COUNT(*) OVER (PARTITION BY ETLName)
	INTO #PreOp
	FROM
	(
		SELECT	 ETLLogDurationHistoryID
				,ETLName
				,DateRun
				,TimeRun
				,Duration
				,DurationInt
				,ETLOutcome
				,RecentPerDay	= ROW_NUMBER() OVER (PARTITION BY ETLName, DateRun ORDER BY TimeRun DESC)
		FROM tests.ETLLogDurationHistory	EDH
		WHERE EDH.DateRun < CAST(GETDATE() AS DATE)
	)DT
	WHERE RecentPerDay = 1

	DECLARE @tXYData XYDataPoints
	INSERT INTO @tXYData(GroupID,X,Y)
	SELECT	 ETLName
			,X
			,Y
	FROM #PreOp
	WHERE SampleOrdinal <= @nSampleMAX AND MeasurementsPerTable >= @nSampleMIN

	/*-----------------------------------------------
	  * Table to use for updating the error log
	-----------------------------------------------*/
	DECLARE @ErrorOutput TABLE	( 
						FullTableName VARCHAR(200) NOT NULL
					)

	;WITH Pruned AS
	(
		SELECT	 ETLName
				,X						
				,DateRun
				,Y						
				,SampleOrdinal			
				,MeasurementsPerTable	
		FROM #PreOp pO
		WHERE	EXISTS	(
							SELECT *
							FROM @tXYData	tD
							WHERE tD.GroupID = pO.ETLName	
						)
	)
	,FNResponse AS
	(
		SELECT	 GroupID				
				,Slope				
				,Intercept			
				,StDevError			
		FROM dbo.fn_GetSimpleLinearRegression(@tXYData)
	)
	,TodayXCTE AS
	(
		SELECT	 ETLName
				,X						
				,DateRun
				,Y						
				,SampleOrdinal			
				,MeasurementsPerTable
				,GroupID				
				,Slope				
				,Intercept			
				,StDevError		
				,PredictedY	=	(	Slope * (
												--	TodayX
												DATEDIFF(DAY, MIN(DateRun) OVER (PARTITION BY ETLName), GETDATE()) + 1
											)
								) + Intercept
		FROM Pruned		P
		JOIN FNResponse	F	ON P.ETLName = F.GroupID
	)
	,ThresholdCTE AS
	(
		SELECT DISTINCT
				 ETLName
				,PredictedY
				,Y
				,UpperThreshold	= PredictedY + (@STDevSensitivity * STDevError)
				,LowerThreshold	= PredictedY - (@STDevSensitivity * STDevError)
				,StDevError
		FROM TodayXCTE
		WHERE SampleOrdinal = 1
	)
	,TblETLDurHistCTE AS
	(
		SELECT	 ETLName
				,DateRun
				,TimeRun
				,Duration
				,DurationInt
				,ETLOutcome
				,IsError
				,Sensitivity
				,CurrCntSigma
				,RecentPerDay	= ROW_NUMBER() OVER (PARTITION BY ETLName, DateRun ORDER BY TimeRun DESC)
		FROM tests.ETLLogDurationHistory	EDH
	)
	/*-----------------------------------------------
	  * Update only most recent record in history 
		table
	-----------------------------------------------*/
	UPDATE EDH
	SET  IsError		= 1
		,Sensitivity	= @STDevSensitivity
		,CurrCntSigma	= CEILING(ABS(EDH.DurationInt - ths.PredictedY) / NULLIF(ths.StDevError,0))
	/*-----------------------------------------------
	  * Output updated values to table variable 
		for later use 
	-----------------------------------------------*/
	OUTPUT	INSERTED.ETLName
	INTO	@ErrorOutput

	FROM TblETLDurHistCTE	EDH
	LEFT
	JOIN ThresholdCTE		ths	ON EDH.ETLName = ths.ETLName
	WHERE	EDH.RecentPerDay = 1
		AND EDH.DateRun = CAST(GETDATE() AS DATE)
		AND EDH.IsError IS NULL
		AND (
				(EDH.DurationInt NOT BETWEEN ths.LowerThreshold AND ths.UpperThreshold) 
				OR EDH.DurationInt = 0
			)

	/*-----------------------------------------------
	  * Log entry to failure log
	-----------------------------------------------*/
	INSERT INTO tests.TestFailureLog (TestDate, TestTime, TestCollection, TestName)		
	SELECT	 TestDate			= CAST(GETDATE() AS DATE)
			,TestTime			= CAST(GETDATE() AS TIME)
			,TestCollection		= 'ETLDuration'
			,FullTableName
	FROM @ErrorOutput
END

You can download the script file here.

Hope you are able to find this as useful as I do. Feel free to ask any questions in the comment section below!
10421110_647997325276802_799481460399430457_n