Employment spells

tori.vanloenhout
24 June 2024

Module Output

SQL: [IDI_Community].[emp_employment_spells].employment_spells_YYYYMM
SAS: libname cmes ODBC dsn=isi_community_srvprd schema=emp_employment_spells; proc print data = cmes.employment_spells_YYYYMM
How to access a code module in the Data Lab:Read here

Context:

The purpose of this module is to construct spells of employment for people in New Zealand earning wage and salary income. These spells can be used to produce estimates of people who are employed at a given point in time. Employment is inferred from the presence of wage and salary income on PAYE filings from employers to IRD, and therefore does not capture employment outside the view of the New Zealand tax system.

Key Concepts

Employer Monthly Schedule

The primary data source for this module is the ir_clean.ird_ems data set, also known as the Employer Monthly Schedule (EMS). This is a monthly tax return filed by businesses with paid employees, which summarises the monthly wage and salary payments made to each of their employees, and the ‘pay-as-you-earn’ (PAYE) income tax deductions made.

This data begins in April 1999, so all people employed in April 1999 will have an apparent start date for that employee-employer relationship of April 1999 as employment information is not known before this date.

Since April 2019, Inland Revenue has shifted from monthly filings under the EMS to compulsory payday filings under the Employment Information - Employee (EI-E) system. This follows a transition period since April 2018 in which firms could select whether to file monthly or through the payday filing system.

For consistency, Stats NZ has made a processed version of EI-E data available, allocating payday information to calendar months and including these records as part of the EMS tables.

Linking across datasets

The EMS provides the key link between individual information (identified by the snz_uid variable) to that of the enterprise (enterprise_nbr). The raw EMS data relate two confidentialised IR numbers (a payer and a payee), therefore jobs are identified at the tax filing unit level, which is almost always the enterprise level.

Confidentialised payer IR numbers are mapped to enterprise numbers by Statistics NZ, using relationships held on the Business Register. These, in turn, are mapped to permanent enterprise numbers (PENTs), which are an enhanced longitudinal business identifier that use plant level employment tracking to repair broken enterprise number linkages (Fabling 2011).

We have recreated the logic (in line with IDI Community deployment coding standards) that Richard Fabling uses to create the pent_IDI_YYYYMM_RFabling and pent_WP_yr_IDI_YYYYMM_RFabling tables. This allows for a tri-annual update aligned with the refresh cycles.

Similarly, Statistics NZ map confidentialised payee IR numbers to worker ids (snz_uids) which, in turn, link to other person-level collections in the IDI. Thus, ultimately, a job is defined as a worker-firm (snz_uid — pent) relationship observed at a monthly frequency.

Employment

Employees

Self-employment

Business history

Date Historical Context
April-1999 EMS data began in April 1999, so all people employed in April 1999 will have an apparent start date for that employee-employer relationship of April 1999 as employment information is not known before this date.
April-2018 A transition period begins, in which firms could select whether to file tax returns monthly or through the payday filing system.
April-2019 Inland Revenue shifted from monthly filings under the EMS to compulsory payday filing under the Employment Information - Employee (EI-E) system. For consistency, Stats NZ has made a processed version of the EI-E data available, allocating payday information to calendar months and including these records as part of the EMS tables.

References

  1. Fabling, R (2011). Keeping it Together: Tracking Firms in New Zealand’s Longitudinal Business Database. Motu Working Paper 11-01. March 2011. (https://motu-www.motu.org.nz/wpapers/11_01.pdf)
  2. Fabling, R., & Mare, D. C. (2015). Addressing the absence of hours information in linked employer-employee data. Motu Working Paper 15-17. October 2015. (https://motu-www.motu.org.nz/wpapers/15_17.pdf)
  3. Fabling, R., & Sanderson, L. (2016). A Rough Guide to New Zealand’s Longitudinal Business Database 2nd Ed. Motu Working Paper 16-03. February 2016. (https://motu-www.motu.org.nz/wpapers/16_03.pdf)
  4. Fabling, R., & Mare, D. C. (2020). Measuring commute patterns over time. Using administrative data to identify where employees live and work. Motu Wroking Paper 20-05. July 2020. (https://motu-www.motu.org.nz/wpapers/20_05.pdf)

Development Team

Who Agency Involvement
Nafees Anwar MBIE Initial draft of Job spells code.
Marc de Boer MSD Contributor.
Lynda Sanderson Prod Comm Contributor.
Ashleigh Arendt SWA Peer review of code.
Corey Allen MBIE SME review.
Tori Van Loenhout Nicholson Consulting Module coder.

Key Business Rules

Employer identifier

Spell start

The spell start for an employment spell is derived from ir_ems_return_period_date - and is presented as year & month integer. Employment beginning on or before April 1999 will be referenced as 199904. A new employment spell begins for an existing individual-enterprise when there has been a gap in that employment spell of greater than 1-month i.e. there is more than 1 yearmonth not recorded in the ird_ems data set for that combination. A new employment spell will also begin if the individual-enterprise combination changes i.e. a person moves to a new firm. PENTs are also included in the output for researchers wanting to engage in a longitudinal analysis and the data is available if you want to combine spells where the enterprise number changes but the pent stays the same.

Spell end

The spell end for an employment spell is derived from ir_ems_return_period_date - and is presented as year & month integer. Employment current as at the latest recorded filing time for the refresh will be presented as the spell_end - there is no future dating. An employment spell ends for an individual-enterprise combination when there has been a gap in that employment spell of greater than 1-month i.e. there is more than 1 yearmonth not recorded in the ird_ems data set for that combination. Please note, this does not capture if the individual begins a new contract or changes position withing the same enterprise or if they move to a new location. The spell end is or the individual-enterprise relationship.

Working proprietor

A binary indicator for working proprietor. If a person is recognised as a working proprietor in any tax year that is captured within the employment spell, they will be designated as a working proprietor for that spell. Working proprietors are defined as individuals (in characteristics table or W&S data) and either drawing non-PAYE income (IR3 non-zero net profit, IR20 non-zero total income & IR3 with partnership income (latter criteria excludes passive investors), IR4S non-zero remuneration (above a threshold level adjusted for consumer price index)) or being paid a PAYE W&S and payer=payee. Note that the in the ird_ems table working proprietors identified in the employment spells output must have an income source of W&S.

Repeat employer within 12 months

In this context a repeat employer is where there is more than one observation for an employee-employer relationship in the output data set that started or ended 12 months before or after another spell. This does not represent situations where an individual has a continuous relationship with an employer even in circumstances where they have shifted to a different plant within the enterprise, or where there is a new employment agreement of contract e.g. promotion to a new role, contract extension.

Joint Filer

A joint filer is where an employer files collective EMS returns for multiple enterprises, which complicates the identification of the employer-employee relationship and has flow on impacts for geographic allocation of employment, industry classification, and job-transitions between enterprises that have the same joint filer. There is single joint-filer responsible for the Education Service Payroll (ESP), although an ESP specific joint-filer flag is not in scope for this module. If at any time during the employment spell the PENT is identified as a joint filer then the binary indicator will be 1 even if the PENT was not identified as a joint filer for each month in the employment spell.

One month gap count

The one month gap count is derived from counting the number of one month gap adjustments that are allowed for in the business rules of the employment spells. This may be used as a proxy to identify casual employment types where there is an ongoing relationship between the employer and employee but where there may not be sustained employment. This is a separate variable to the repeat employer within 12 months, in that while there may be more than a one month gap in employment over the course of the employment spell the gaps will not be consecutive.

Granularity

The output data set for this code module is one row per snz_uid, pent, enterprise_nbr, spell_start and spell_end. Due to the inclusion of enterprise number in the business key, spells are split where there is a one-to-many relationship between pent and enterprise. Associated event information variables are also at the person-enterprise level. The code can be modified by researchers, where interested, to exclude enterprise an make an uid_pent_index rather than a uid_ent_index.

Parameters

The following parameters should be supplied to this module to run it in the database:

  1. {targetdb}: The SQL database on which the spell data set is to be created.
  2. {idicleanrefresh}: The refresh version database that you want to use.
  3. {targetschema}: The project schema under the target database into which the spell data sets are to be created.
  4. {yearmonth}: The refresh version of the data that you want to use applicable to data sets other than IDI_Clean.

Dependencies

{idicleanrefresh}.[ir_clean].[ird_ems] 
{idicleanrefresh}.[data].[income_pbn_ent] 
{idicleanrefresh}.[security].[concordance]
{idicleanrefresh}.[br_clean].[enterprise]
{idicleanrefresh}.[ir_clean].[ird_customers]
{idicleanrefresh}.[br_clean].[ird_enterprise_xref]
{idicleanrefresh}.[ir_clean].[ird_rtns_keypoints_ir3]
{idicleanrefresh}.[ir_clean].[ird_attachments_ir20]
{idicleanrefresh}.[ir_clean].[ird_attachments_ir4s]
[LBD_Clean].[lbf_clean].[load_lbf_fact_enterprise]
[LBD_Clean].[reference].[dim_bal_date_year]
[LBD_Clean].[gst_clean].[load_gst_return]
[LBD_Clean].[lbf_clean].[load_lbf_enterprise_ird_link]
[LBD_Clean].[i10_clean].[fact_i10_enterprise_year_pre2013]
[LBD_Clean].[i10_clean].[fact_i10_enterprise_year]
 

Outputs

IDI_Sandpit.{targetschema}.[Employment_spells]

Variable Descriptions

The business key for this spell table is one row per snz_uid, pent, enterprise_nbr, spell_start and spell_end.

Aspect Variables Description
Entity snz_uid The unique STATSNZ person identifier for the person.
pent Permanent enterprise number.
enterprise_nbr Enterprise number.
Source data_source ird_ems.
Period spell_start The start date for the employment spell derived from ir_ems_return_period_date - presented as yearmonth. Employment beginning on or before April 1999 will be referenced as 199904.
spell_end The end date for the employment spell derived from ir_ems_return_period_date - presented as yearmonth. Employment current as at refresh date will be end-dated as the latest employment records in the refresh.
Event information working_proprietor A binary indicator for working proprietor. If a person is recognised as a working proprietor in any tax year that is captured within the employment spell, they will be designated as a working proprietor for that spell.
repeat_employer_12m Derived from snz_uid and employer_id. Where an employer_id is repeated more than once per snz_uid within a 12 month period they get a flag for being a repeat employer. This is to help determine seasonal employment.
joint_filer A binary indicator of when an employer files collective EMS
one_month_gap_count Derived variable that provides the number of one month gaps in

Module Version & Change History

Date Version Comments
June 2024 Initial version based on specifications from Commissioning document.
March 2025 Updated to remove dependencies on the Fabling tables to support tri-annual refresh cycles.

Code

/* Set Parameters */

/*PARAMETERS
SQLCMD only (Activate by clicking Query->SQLCMD Mode)
*/

:setvar targetdb "{targetdb}"
:setvar idicleanversion "{idicleanversion}"  
:setvar targetschema "targetschema"
:setvar yyyymm "{yyyymm}"
:setvar tail1 "{tail1}"
:setvar tail2 "{tail2}"
:setvar pbn_excl "{pbn_excl}"
:setvar empid_excl "{empid_excl}"
:setvar wp_excl "{wp_excl}"
:setvar current_year "{current_year}"

/* Assign the target database to which all the components need to be created in. */
USE IDI_UserCode;
GO

/* repairs EX/NULL values */ 

DROP TABLE IF EXISTS #s1_for_repair_EXNULL;
CREATE TABLE #s1_for_repair_EXNULL(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL);

INSERT INTO #s1_for_repair_EXNULL(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key)

SELECT DISTINCT
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
FROM [$(idicleanversion)].[ir_clean].[ird_ems]
WHERE ■■■■■■■■■■■■■■■					
AND ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND (ir_ems_enterprise_nbr IS NULL OR ir_ems_enterprise_nbr LIKE 'EX%');

/*Only include complete filing months. The number of rows in "for repair" table will vary substantially between instances depending on the number of months with no links. Exclude these "tail" months for repair in later step */

DROP TABLE IF EXISTS #s1_for_repair_tail;
CREATE TABLE #s1_for_repair_tail(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL);

INSERT INTO #s1_for_repair_tail(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key)

SELECT
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
FROM #s1_for_repair_EXNULL
WHERE dim_month_key IN ($(tail1),$(tail2));

DELETE FROM #s1_for_repair_EXNULL
WHERE dim_month_key >= (
SELECT 
min(dim_month_key) 
FROM #s1_for_repair_tail
);

DROP TABLE IF EXISTS #s1_employees_EXNULL;

CREATE TABLE #s1_employees_EXNULL(
snz_ird_uid						int				NOT NULL
PRIMARY KEY CLUSTERED (snz_ird_uid));

INSERT INTO #s1_employees_EXNULL(snz_ird_uid)

SELECT DISTINCT snz_ird_uid
FROM #s1_for_repair_EXNULL;

DROP TABLE IF EXISTS #s1_employers_EXNULL;

CREATE TABLE #s1_employers_EXNULL(
snz_employer_ird_uid			int				NOT NULL
PRIMARY KEY CLUSTERED (snz_employer_ird_uid));

INSERT INTO #s1_employers_EXNULL(snz_employer_ird_uid)

SELECT DISTINCT snz_employer_ird_uid
FROM #s1_for_repair_EXNULL

/*Get all potential links to ENT from EMS jobs*/
DROP TABLE IF EXISTS #s1_ems_all_EXNULL;

CREATE TABLE #s1_ems_all_EXNULL(
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL
PRIMARY KEY CLUSTERED (snz_employer_ird_uid,dim_month_key,enterprise_nbr,pbn_nbr));

INSERT INTO #s1_ems_all_EXNULL(
snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT DISTINCT
n.snz_employer_ird_uid,
dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date),
ir_ems_enterprise_nbr,
ir_ems_pbn_nbr
FROM #s1_employers_EXNULL n
JOIN [$(idicleanversion)].[ir_clean].[ird_ems] i
ON n.snz_employer_ird_uid = i.snz_employer_ird_uid
WHERE ■■■■■■■■■■■■■■■					
AND ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND ir_ems_enterprise_nbr LIKE 'EN%';

/*Exclude PBN associated with permanent allocation issue */
DELETE 
FROM #s1_ems_all_EXNULL
WHERE pbn_nbr = '$(pbn_excl)';

DROP TABLE IF EXISTS #s1_pbn_at_EXNULL;

CREATE TABLE #s1_pbn_at_EXNULL(
snz_ird_uid						int				NOT NULL,
pbn_nbr							char(10)		NOT NULL,
n_rows							int				NOT NULL
PRIMARY KEY CLUSTERED (snz_ird_uid,pbn_nbr));

INSERT INTO #s1_pbn_at_EXNULL(
snz_ird_uid
,pbn_nbr
,n_rows)

SELECT 
n.snz_ird_uid,
ir_ems_pbn_nbr,
n_rows = count(*)
FROM #s1_employees_EXNULL n
JOIN [$(idicleanversion)].[ir_clean].[ird_ems] i
on n.snz_ird_uid = i.snz_ird_uid
WHERE ir_ems_income_source_code = 'W&S'
AND ir_ems_gross_earnings_amt > 0
AND ir_ems_enterprise_nbr LIKE 'EN%'
GROUP BY n.snz_ird_uid,ir_ems_pbn_nbr;

DELETE 
FROM #s1_pbn_at_EXNULL
WHERE pbn_nbr = '$(pbn_excl)'; 

DROP TABLE IF EXISTS #s1_ent_pbn_repair;

CREATE TABLE #s1_ent_pbn_repair(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL
PRIMARY KEY CLUSTERED (snz_ird_uid,snz_employer_ird_uid,dim_month_key));

/*Employer IR-ENT-PBN employs in month of interest, and worker is ever at PBN Break ties using PBN n_rows, followed by min(PBN) if only one ENT*/
INSERT INTO #s1_ent_pbn_repair(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT 
r.snz_ird_uid,
r.snz_employer_ird_uid,
r.dim_month_key,
enterprise_nbr = min(a.enterprise_nbr),
pbn_nbr = min(a.pbn_nbr) 
FROM #s1_for_repair_EXNULL r
JOIN #s1_ems_all_EXNULL a
ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
AND r.dim_month_key = a.dim_month_key
JOIN #s1_pbn_at_EXNULL p
ON r.snz_ird_uid = p.snz_ird_uid
AND p.pbn_nbr = a.pbn_nbr
JOIN (SELECT 
	r.snz_ird_uid,
	r.snz_employer_ird_uid,
	r.dim_month_key,
	max_n_rows = max(n_rows)
	FROM #s1_for_repair_EXNULL r
	JOIN #s1_ems_all_EXNULL a
	ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
	AND r.dim_month_key = a.dim_month_key
	JOIN #s1_pbn_at_EXNULL p
	ON r.snz_ird_uid = p.snz_ird_uid
	AND p.pbn_nbr = a.pbn_nbr
	GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid,r.dim_month_key) m
ON 	r.snz_ird_uid = m.snz_ird_uid
AND	r.snz_employer_ird_uid = m.snz_employer_ird_uid
AND	r.dim_month_key = m.dim_month_key
AND p.n_rows = m.max_n_rows
GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid,r.dim_month_key
HAVING min(a.enterprise_nbr) = max(a.enterprise_nbr);

/*Take ENT link for employer IR if only one avalible, or take min(PBN) where multiple PBNs available*/
INSERT INTO #s1_ent_pbn_repair(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT 
r.snz_ird_uid,
r.snz_employer_ird_uid,
r.dim_month_key,
enterprise_nbr = min(a.enterprise_nbr),
pbn_nbr = min(a.pbn_nbr) 
FROM #s1_for_repair_EXNULL r
JOIN #s1_ems_all_EXNULL a
ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
AND r.dim_month_key = a.dim_month_key
LEFT JOIN #s1_ent_pbn_repair x
ON 	r.snz_ird_uid = x.snz_ird_uid
AND	r.snz_employer_ird_uid = x.snz_employer_ird_uid
AND	r.dim_month_key = x.dim_month_key
WHERE x.snz_ird_uid IS NULL   
GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid,r.dim_month_key
HAVING min(a.enterprise_nbr) = max(a.enterprise_nbr);

/*Repair links using immediately prior month PBN employment*/

DROP TABLE IF EXISTS #s1_temp_for_repair;

CREATE TABLE #s1_temp_for_repair(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
mth_prior						int				NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,snz_employer_ird_uid,mth_prior));

INSERT INTO #s1_temp_for_repair(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,mth_prior)

SELECT
r.snz_ird_uid
,r.snz_employer_ird_uid
,r.dim_month_key
,mth_prior = (DATEPART(YEAR, DATEADD(MONTH, -1, (CONVERT(VARCHAR, r.dim_month_key) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, -1, (CONVERT(VARCHAR, r.dim_month_key) + '01')))
FROM #s1_for_repair_EXNULL r
LEFT JOIN #s1_ent_pbn_repair x
ON 	r.snz_ird_uid = x.snz_ird_uid
AND	r.snz_employer_ird_uid = x.snz_employer_ird_uid
AND	r.dim_month_key = x.dim_month_key	
LEFT JOIN (SELECT DISTINCT 
			snz_employer_ird_uid
			,dim_month_key
			FROM #s1_ems_all_EXNULL) e
ON 	r.snz_employer_ird_uid = e.snz_employer_ird_uid
AND	r.dim_month_key = e.dim_month_key	
WHERE x.snz_ird_uid IS NULL			
AND e.snz_employer_ird_uid IS NULL	
AND r.snz_employer_ird_uid IN (SELECT DISTINCT snz_employer_ird_uid FROM #s1_ems_all_EXNULL); 

/*Remove links that would create violations of ent-pbn relationship (ie, PBN belongs to a single ENT in a month)*/ 

DROP TABLE IF EXISTS #s1_temp_pbn_candidates;

CREATE TABLE #s1_temp_pbn_candidates(
pbn_nbr				char(10)				NOT NULL
PRIMARY KEY CLUSTERED(pbn_nbr));

INSERT INTO #s1_temp_pbn_candidates(pbn_nbr)

SELECT DISTINCT 
pbn_nbr
FROM #s1_ems_all_EXNULL
WHERE snz_employer_ird_uid IN (SELECT snz_employer_ird_uid FROM #s1_temp_for_repair);

DROP TABLE IF EXISTS #s1_ent_pbn_mth;
CREATE TABLE #s1_ent_pbn_mth(
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL,
dim_month_key					int				NOT NULL,
PRIMARY KEY CLUSTERED (pbn_nbr,dim_month_key));

INSERT INTO #s1_ent_pbn_mth(
enterprise_nbr
,pbn_nbr
,dim_month_key)

SELECT DISTINCT
ir_ems_enterprise_nbr
,ir_ems_pbn_nbr
,dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
FROM [$(idicleanversion)].[ir_clean].[ird_ems] e
JOIN #s1_temp_pbn_candidates p
ON e.ir_ems_pbn_nbr = p.pbn_nbr
WHERE e.ir_ems_income_source_code = 'W&S'
AND e.ir_ems_gross_earnings_amt > 0;

DROP TABLE IF EXISTS #s1_temp_ent_pbn_repair;

CREATE TABLE #s1_temp_ent_pbn_repair(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL
PRIMARY KEY CLUSTERED (snz_ird_uid,snz_employer_ird_uid,dim_month_key));

DECLARE @s1_mth int
SELECT @s1_mth = 199905

DECLARE @s1_last_mth int
SELECT @s1_last_mth = max(dim_month_key) FROM #s1_temp_for_repair

while @s1_mth <= @s1_last_mth
BEGIN
	CREATE TABLE #s1_temp_for_repair_currmth(
	snz_ird_uid						int				NOT NULL,
	snz_employer_ird_uid			int				NOT NULL,
	dim_month_key					int				NOT NULL,
	mth_prior						int				NOT NULL
	PRIMARY KEY CLUSTERED(snz_ird_uid,snz_employer_ird_uid,mth_prior))

	INSERT INTO #s1_temp_for_repair_currmth(
	snz_ird_uid
	,snz_employer_ird_uid
	,dim_month_key
	,mth_prior)

	SELECT
	snz_ird_uid
	,snz_employer_ird_uid
	,dim_month_key
	,mth_prior
	FROM #s1_temp_for_repair
	WHERE dim_month_key = @s1_mth

	/*All potential links to ENT from EMS jobs in prior month*/
	CREATE TABLE #s1_temp_ems_all_EXNULL_currmth(
	snz_employer_ird_uid			int				NOT NULL,
	enterprise_nbr					char(10)		NOT NULL,
	pbn_nbr							char(10)		NOT NULL
	PRIMARY KEY CLUSTERED (snz_employer_ird_uid,enterprise_nbr,pbn_nbr))

	INSERT INTO #s1_temp_ems_all_EXNULL_currmth(snz_employer_ird_uid,enterprise_nbr,pbn_nbr)
	SELECT
	snz_employer_ird_uid
	,enterprise_nbr
	,pbn_nbr
	FROM #s1_ems_all_EXNULL
	WHERE dim_month_key IN (SELECT DISTINCT mth_prior FROM #s1_temp_for_repair_currmth)
	AND snz_employer_ird_uid IN (SELECT DISTINCT snz_employer_ird_uid FROM #s1_temp_for_repair_currmth)

	/*Additional links from prior month looping*/
	INSERT INTO #s1_temp_ems_all_EXNULL_currmth(
	snz_employer_ird_uid
	,enterprise_nbr
	,pbn_nbr)
	SELECT
	x.snz_employer_ird_uid,x.enterprise_nbr,x.pbn_nbr
	FROM (SELECT DISTINCT 
			snz_employer_ird_uid
			,enterprise_nbr
			,pbn_nbr
			FROM #s1_temp_ent_pbn_repair
			WHERE dim_month_key IN (SELECT DISTINCT mth_prior FROM #s1_temp_for_repair_currmth)) x
	LEFT JOIN #s1_temp_ems_all_EXNULL_currmth e
	ON x.snz_employer_ird_uid = e.snz_employer_ird_uid
	AND x.enterprise_nbr = e.enterprise_nbr
	AND x.pbn_nbr = e.pbn_nbr
	WHERE e.snz_employer_ird_uid IS NULL
	AND x.snz_employer_ird_uid IN (SELECT DISTINCT snz_employer_ird_uid FROM #s1_temp_for_repair_currmth)

	/*Remove any link that would violate the PBN rule*/
	DELETE l
	FROM #s1_temp_ems_all_EXNULL_currmth l
	JOIN #s1_ent_pbn_mth e
	ON l.pbn_nbr = e.pbn_nbr
	WHERE dim_month_key = @s1_mth
	AND l.enterprise_nbr <> e.enterprise_nbr

	INSERT INTO #s1_temp_ent_pbn_repair(
	snz_ird_uid
	,snz_employer_ird_uid
	,dim_month_key
	,enterprise_nbr
	,pbn_nbr)

	SELECT 
	r.snz_ird_uid,
	r.snz_employer_ird_uid,
	r.dim_month_key,
	enterprise_nbr = min(a.enterprise_nbr),
	pbn_nbr = min(a.pbn_nbr) 
	FROM #s1_temp_for_repair_currmth r
	JOIN #s1_temp_ems_all_EXNULL_currmth a
	ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
	JOIN #s1_pbn_at_EXNULL p
	ON r.snz_ird_uid = p.snz_ird_uid
	AND p.pbn_nbr = a.pbn_nbr
	JOIN (SELECT 
		r.snz_ird_uid,
		r.snz_employer_ird_uid,
		max_n_rows = max(n_rows)
		FROM #s1_temp_for_repair_currmth r
		JOIN #s1_temp_ems_all_EXNULL_currmth a
		ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
		JOIN #s1_pbn_at_EXNULL p
		ON r.snz_ird_uid = p.snz_ird_uid
		AND p.pbn_nbr = a.pbn_nbr
		GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid) m
	ON 	r.snz_ird_uid = m.snz_ird_uid
	AND	r.snz_employer_ird_uid = m.snz_employer_ird_uid
	AND p.n_rows = m.max_n_rows
	GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid,r.dim_month_key
	HAVING min(a.enterprise_nbr) = max(a.enterprise_nbr)

	INSERT INTO #s1_temp_ent_pbn_repair(
	snz_ird_uid
	,snz_employer_ird_uid
	,dim_month_key
	,enterprise_nbr
	,pbn_nbr)

	SELECT 
	r.snz_ird_uid,
	r.snz_employer_ird_uid,
	r.dim_month_key,
	enterprise_nbr = min(a.enterprise_nbr),
	pbn_nbr = min(a.pbn_nbr) 
	FROM #s1_temp_for_repair_currmth r
	JOIN #s1_temp_ems_all_EXNULL_currmth a
	ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
	LEFT JOIN #s1_temp_ent_pbn_repair x
	ON 	r.snz_ird_uid = x.snz_ird_uid
	AND	r.snz_employer_ird_uid = x.snz_employer_ird_uid
	AND	r.dim_month_key = x.dim_month_key
	WHERE x.snz_ird_uid IS NULL   
	GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid,r.dim_month_key
	HAVING min(a.enterprise_nbr) = max(a.enterprise_nbr)

	DROP TABLE #s1_temp_for_repair_currmth
    DROP TABLE #s1_temp_ems_all_EXNULL_currmth

	set @s1_mth = (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @s1_mth) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @s1_mth) + '01'))) 
END

/* Add links to main repair table */ 
INSERT INTO #s1_ent_pbn_repair(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr
FROM #s1_temp_ent_pbn_repair

DROP TABLE IF EXISTS #s1_employers_tail;

CREATE TABLE #s1_employers_tail(
snz_employer_ird_uid			int				NOT NULL
PRIMARY KEY CLUSTERED (snz_employer_ird_uid));

INSERT INTO #s1_employers_tail(snz_employer_ird_uid)

SELECT DISTINCT snz_employer_ird_uid
FROM #s1_for_repair_tail;

/*All potential links to ENT from EMS jobs in tail employers, restricted to final non-tail month*/
DROP TABLE IF EXISTS #s1_ems_recent_tail;

CREATE TABLE #s1_ems_recent_tail(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL
PRIMARY KEY CLUSTERED (snz_employer_ird_uid,snz_ird_uid,enterprise_nbr,pbn_nbr));

INSERT INTO #s1_ems_recent_tail(
snz_ird_uid
,snz_employer_ird_uid
,enterprise_nbr
,pbn_nbr)

SELECT DISTINCT
snz_ird_uid,
n.snz_employer_ird_uid,
ir_ems_enterprise_nbr,
ir_ems_pbn_nbr
FROM #s1_employers_tail n
JOIN [$(idicleanversion)].[ir_clean].[ird_ems] i
ON n.snz_employer_ird_uid = i.snz_employer_ird_uid
WHERE ■■■■■■■■■■■■■■■					
AND ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND ir_ems_enterprise_nbr LIKE 'EN%'
AND year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date) = (SELECT max(dim_month_key) FROM #s1_for_repair_EXNULL);

/*Add repairs to potential links*/
INSERT INTO #s1_ems_recent_tail(
snz_ird_uid
,snz_employer_ird_uid
,enterprise_nbr
,pbn_nbr)

SELECT 
r.snz_ird_uid
,r.snz_employer_ird_uid
,r.enterprise_nbr
,r.pbn_nbr
FROM #s1_ent_pbn_repair r
LEFT JOIN #s1_ems_recent_tail e
ON r.snz_employer_ird_uid = e.snz_employer_ird_uid
AND r.snz_ird_uid = e.snz_ird_uid
AND r.enterprise_nbr = e.enterprise_nbr
AND r.pbn_nbr = e.pbn_nbr
WHERE e.snz_ird_uid IS NULL 
AND r.dim_month_key = (SELECT max(dim_month_key) FROM #s1_for_repair_EXNULL);

DROP TABLE IF EXISTS #s1_ent_pbn_repair_tail;

CREATE TABLE #s1_ent_pbn_repair_tail(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL
PRIMARY KEY CLUSTERED (snz_ird_uid,snz_employer_ird_uid,dim_month_key));

INSERT INTO #s1_ent_pbn_repair_tail(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT 
r.snz_ird_uid
,r.snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr
FROM #s1_for_repair_tail r
JOIN #s1_ems_recent_tail e
ON r.snz_employer_ird_uid = e.snz_employer_ird_uid
AND r.snz_ird_uid = e.snz_ird_uid;

/* Take ENT link for employer if only one avalible in last non-tail month, otherwise take min(PBN) where multiple PBNs available*/
INSERT INTO #s1_ent_pbn_repair_tail(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT 
r.snz_ird_uid,
r.snz_employer_ird_uid,
r.dim_month_key,
enterprise_nbr = min(a.enterprise_nbr),
pbn_nbr = min(a.pbn_nbr) 
FROM #s1_for_repair_tail r
JOIN #s1_ems_recent_tail a
ON r.snz_employer_ird_uid = a.snz_employer_ird_uid
LEFT JOIN #s1_ent_pbn_repair_tail x
ON 	r.snz_ird_uid = x.snz_ird_uid
AND	r.snz_employer_ird_uid = x.snz_employer_ird_uid
AND	r.dim_month_key = x.dim_month_key
WHERE x.snz_ird_uid IS NULL 
GROUP BY r.snz_ird_uid,r.snz_employer_ird_uid,r.dim_month_key
HAVING min(a.enterprise_nbr) = max(a.enterprise_nbr);

INSERT INTO #s1_ent_pbn_repair(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr
FROM #s1_ent_pbn_repair_tail;

/* Permanent allocation issue */

DROP TABLE IF EXISTS #s1_for_repair_200912_201001;

CREATE TABLE #s1_for_repair_200912_201001(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL);

INSERT INTO #s1_for_repair_200912_201001(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key)

SELECT DISTINCT
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
FROM [$(idicleanversion)].[ir_clean].[ird_ems]
WHERE ■■■■■■■■■■■■■■■					
AND ir_ems_income_source_code = 'W&S'
AND ir_ems_gross_earnings_amt > 0
AND ((year(ir_ems_return_period_date) = 2009 AND month(ir_ems_return_period_date) = 12)
  OR (year(ir_ems_return_period_date) = 2010 AND month(ir_ems_return_period_date) = 1))
AND snz_employer_ird_uid = '$(empid_excl)'
AND ir_ems_pbn_nbr LIKE 'PX%';

/*Repair using aproximate links (within 6 months)*/

DROP TABLE IF EXISTS #s1_candidate_repair_2009_2010;

CREATE TABLE #s1_candidate_repair_2009_2010(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL);

INSERT INTO #s1_candidate_repair_2009_2010(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT DISTINCT
i.snz_ird_uid
,i.snz_employer_ird_uid
,dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
,ir_ems_enterprise_nbr
,ir_ems_pbn_nbr
FROM [$(idicleanversion)].[ir_clean].[ird_ems] i
JOIN #s1_for_repair_200912_201001 r
ON i.snz_ird_uid = r.snz_ird_uid
AND i.snz_employer_ird_uid = r.snz_employer_ird_uid
WHERE ir_ems_income_source_code = 'W&S'
AND ir_ems_gross_earnings_amt > 0
AND ((year(ir_ems_return_period_date) = 2009 AND month(ir_ems_return_period_date) >= 6)
  OR (year(ir_ems_return_period_date) = 2010 AND month(ir_ems_return_period_date) <= 7))
AND ir_ems_enterprise_nbr LIKE 'EN%'
AND ir_ems_pbn_nbr LIKE 'PB%';

DROP TABLE IF EXISTS #s1_temp_pbn_candidates_2009_2010;

CREATE TABLE #s1_temp_pbn_candidates_2009_2010(
pbn_nbr				char(10)				NOT NULL
PRIMARY KEY CLUSTERED(pbn_nbr));

INSERT INTO #s1_temp_pbn_candidates_2009_2010(pbn_nbr)

SELECT DISTINCT 
pbn_nbr
FROM #s1_candidate_repair_2009_2010;

DROP TABLE IF EXISTS #s1_ent_pbn_mth_2009_2010;
CREATE TABLE #s1_ent_pbn_mth_2009_2010(
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL,
dim_month_key					int				NOT NULL,
PRIMARY KEY CLUSTERED (pbn_nbr,dim_month_key));

INSERT INTO #s1_ent_pbn_mth_2009_2010(
enterprise_nbr
,pbn_nbr
,dim_month_key)

SELECT DISTINCT
ir_ems_enterprise_nbr
,ir_ems_pbn_nbr
,dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
FROM [$(idicleanversion)].[ir_clean].[ird_ems] e
JOIN #s1_temp_pbn_candidates_2009_2010 p
ON e.ir_ems_pbn_nbr = p.pbn_nbr
WHERE ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND ((year(ir_ems_return_period_date) = 2009 AND month(ir_ems_return_period_date) = 12)
  OR (year(ir_ems_return_period_date) = 2010 AND month(ir_ems_return_period_date) = 1));

DROP TABLE IF EXISTS #s1_candidate_nearest_2009_2010;
CREATE TABLE #s1_candidate_nearest_2009_2010(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
nearest_before					int					NULL,
nearest_after					int					NULL);

WITH s1_candidate_nearest_2009_2010 AS (
	SELECT
	x.snz_ird_uid
	,x.snz_employer_ird_uid
	,x.dim_month_key
	,nearest_before = max(CASE WHEN r.dim_month_key < x.dim_month_key THEN r.dim_month_key ELSE 0 END)
	,nearest_after = min(CASE WHEN r.dim_month_key > x.dim_month_key THEN r.dim_month_key ELSE 999912 END)
	FROM #s1_for_repair_200912_201001 x
	JOIN #s1_candidate_repair_2009_2010 r
	ON x.snz_ird_uid = r.snz_ird_uid
	AND x.snz_employer_ird_uid = r.snz_employer_ird_uid
	LEFT JOIN #s1_ent_pbn_mth_2009_2010 p
	ON r.pbn_nbr = p.pbn_nbr
	AND x.dim_month_key = p.dim_month_key
	WHERE r.enterprise_nbr = isnull(p.enterprise_nbr,r.enterprise_nbr) --Not a PBN violation
	GROUP BY x.snz_ird_uid,x.snz_employer_ird_uid,x.dim_month_key
) 
INSERT INTO #s1_candidate_nearest_2009_2010 (snz_ird_uid, snz_employer_ird_uid, dim_month_key, nearest_before, nearest_after)
SELECT 
 snz_ird_uid
 ,snz_employer_ird_uid
 ,dim_month_key
 ,nearest_before = CASE WHEN nearest_before = 0 THEN NULL ELSE nearest_before END
 ,nearest_after = CASE WHEN nearest_after = 999912 THEN NULL ELSE nearest_after END
FROM s1_candidate_nearest_2009_2010

INSERT INTO #s1_ent_pbn_repair(
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT
r.snz_ird_uid,
r.snz_employer_ird_uid,
n.dim_month_key,
r.enterprise_nbr,r.pbn_nbr
FROM #s1_candidate_repair_2009_2010 r
JOIN #s1_candidate_nearest_2009_2010 n
ON r.snz_ird_uid = n.snz_ird_uid
AND r.snz_employer_ird_uid = n.snz_employer_ird_uid
AND r.dim_month_key = CASE WHEN n.nearest_after IS NULL OR 
				      DATEDIFF(MONTH, CONVERT(DATETIME, CONVERT(VARCHAR, n.nearest_before) + '01'), CONVERT(DATETIME, CONVERT(VARCHAR, n.dim_month_key)   + '01')) <
					  DATEDIFF(MONTH, CONVERT(DATETIME, CONVERT(VARCHAR, n.dim_month_key) + '01'), CONVERT(DATETIME, CONVERT(VARCHAR, n.nearest_after)   + '01')) 
				      THEN n.nearest_before ELSE n.nearest_after END
LEFT JOIN #s1_ent_pbn_mth_2009_2010 p
ON r.pbn_nbr = p.pbn_nbr
AND n.dim_month_key = p.dim_month_key
WHERE r.enterprise_nbr = isnull(p.enterprise_nbr,r.enterprise_nbr);

DROP TABLE IF EXISTS #s1_ent_pbn_repair_IDI;
CREATE TABLE #s1_ent_pbn_repair_IDI(
snz_ird_uid						int				NOT NULL,
snz_employer_ird_uid			int				NOT NULL,
dim_month_key					int				NOT NULL,
enterprise_nbr					char(10)		NOT NULL,
pbn_nbr							char(10)		NOT NULL
PRIMARY KEY CLUSTERED (snz_ird_uid,snz_employer_ird_uid,dim_month_key));

INSERT INTO #s1_ent_pbn_repair_IDI (
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr) 

SELECT 
snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr
FROM #s1_ent_pbn_repair;

drop table IF EXISTS #s1_for_repair_EXNULL;
drop table IF EXISTS #s1_for_repair_tail;
drop table IF EXISTS #s1_employees_EXNULL;
drop table IF EXISTS #s1_employers_EXNULL;
drop table IF EXISTS #s1_ems_all_EXNULL;
drop table IF EXISTS #s1_pbn_at_EXNULL;
drop table IF EXISTS #s1_ent_pbn_repair;
drop table IF EXISTS #s1_temp_for_repair;
drop table IF EXISTS #s1_temp_pbn_candidates;
drop table IF EXISTS #s1_ent_pbn_mth;
drop table IF EXISTS #s1_temp_ent_pbn_repair;
drop table IF EXISTS #s1_employers_tail;
drop table IF EXISTS #s1_ems_recent_tail;
drop table IF EXISTS #s1_ent_pbn_repair_tail;
drop table IF EXISTS #s1_for_repair_200912_201001;
drop table IF EXISTS #s1_candidate_repair_2009_2010;
drop table IF EXISTS #s1_temp_pbn_candidates_2009_2010;
drop table IF EXISTS #s1_ent_pbn_mth_2009_2010;
drop table IF EXISTS #s1_candidate_nearest_2009_2010;

/*Code repairs simple enterprise_nbr breaks using PBN migration (see Motu Working Paper 11-01 for documentation)*/

DROP TABLE IF EXISTS #s2_ent_pbn_repair;
CREATE TABLE #s2_ent_pbn_repair(
snz_uid					int			NOT NULL,
snz_employer_ird_uid	int			NOT NULL,
dim_month_key			int			NOT NULL,
enterprise_nbr			char(10)    NOT NULL,
pbn_nbr					char(10)    NOT NULL
PRIMARY KEY CLUSTERED (snz_uid,snz_employer_ird_uid,dim_month_key));

INSERT INTO #s2_ent_pbn_repair(
snz_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT
snz_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr
FROM #s1_ent_pbn_repair_IDI r 
JOIN [$(idicleanversion)].[security].[concordance] s
ON r.snz_ird_uid = s.snz_ird_uid;

DROP TABLE IF EXISTS #s2_temp;
CREATE TABLE #s2_temp(
pbn_nbr					char(10)     NOT NULL,
enterprise_nbr			char(10)     NOT NULL,
dim_month_key			int			 NOT NULL);

INSERT INTO #s2_temp(
pbn_nbr
,enterprise_nbr
,dim_month_key)

SELECT DISTINCT 
pbn_nbr
,enterprise_nbr
,dim_month_key
FROM #s2_ent_pbn_repair
WHERE enterprise_nbr LIKE 'EN%';

INSERT INTO #s2_temp(
pbn_nbr
,enterprise_nbr
,dim_month_key)

SELECT DISTINCT
ir_ems_pbn_nbr
,ir_ems_enterprise_nbr
,dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
FROM  [$(idicleanversion)].[ir_clean].[ird_ems] i
LEFT JOIN #s2_ent_pbn_repair r
ON i.snz_uid = r.snz_uid
AND i.snz_employer_ird_uid = r.snz_employer_ird_uid
AND year(i.ir_ems_return_period_date) * 100 + month(i.ir_ems_return_period_date) = r.dim_month_key
WHERE ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND ir_ems_enterprise_nbr LIKE 'EN%'
AND r.snz_uid IS NULL;			

/*Remove duplicates*/

DROP TABLE IF EXISTS #s2_load_lbf_fact_pbn_employee_count;
CREATE TABLE #s2_load_lbf_fact_pbn_employee_count(
pbn_nbr					char(10)     NOT NULL,
enterprise_nbr			char(10)     NOT NULL,
dim_month_key			int			 NOT NULL
PRIMARY KEY CLUSTERED (pbn_nbr,enterprise_nbr,dim_month_key));

INSERT INTO #s2_load_lbf_fact_pbn_employee_count(
pbn_nbr
,enterprise_nbr
,dim_month_key)

SELECT DISTINCT
pbn_nbr
,enterprise_nbr
,dim_month_key
FROM #s2_temp;

DROP TABLE IF EXISTS #s2_temp;

DROP TABLE IF EXISTS #s2_ent_first_last_ec;
CREATE TABLE #s2_ent_first_last_ec(
enterprise_nbr			char(10)     NOT NULL,
first_ec_mth			int			 NOT NULL,
last_ec_mth				int			 NOT NULL
PRIMARY KEY CLUSTERED (enterprise_nbr));

INSERT INTO #s2_ent_first_last_ec(
enterprise_nbr
,first_ec_mth
,last_ec_mth)

SELECT 
enterprise_nbr
,first_ec_mth = min(dim_month_key)
,last_ec_mth = max(dim_month_key)
FROM #s2_load_lbf_fact_pbn_employee_count
GROUP BY enterprise_nbr;

DROP TABLE IF EXISTS #s2_fact_lbf_enterprise_year;
CREATE TABLE #s2_fact_lbf_enterprise_year(
enterprise_nbr				char(10)     NOT NULL);

INSERT INTO #s2_fact_lbf_enterprise_year(enterprise_nbr)

SELECT DISTINCT br_ent_enterprise_nbr 
FROM [$(idicleanversion)].[br_clean].[enterprise] 
WHERE br_ent_enterprise_nbr LIKE 'EN%';

INSERT INTO #s2_fact_lbf_enterprise_year(enterprise_nbr)

SELECT DISTINCT enterprise_nbr 
FROM #s2_ent_first_last_ec
WHERE enterprise_nbr not in (SELECT enterprise_nbr FROM #s2_fact_lbf_enterprise_year);

DROP TABLE IF EXISTS #s2_mth_next;
CREATE TABLE #s2_mth_next(
dim_month_key	int		NOT NULL,
mth_next		int		NOT NULL
PRIMARY KEY CLUSTERED (dim_month_key));

INSERT INTO #s2_mth_next(
dim_month_key
,mth_next)

SELECT
dim_month_key,
mth_next = (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, dim_month_key) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, dim_month_key) + '01')))
FROM (SELECT DISTINCT dim_month_key 
		FROM #s2_load_lbf_fact_pbn_employee_count) x;

/*Collect all the employing pbns that change enterprises*/
DROP TABLE IF EXISTS #s2_pbn_links;
CREATE TABLE #s2_pbn_links(
pbn_nbr					char(10)        NOT NULL,
source_month			int				NOT NULL,
target_month			int				NOT NULL,
source_ent				char(10)        NOT NULL,
target_ent				char(10)        NOT NULL,
PRIMARY KEY CLUSTERED (pbn_nbr, source_month, target_month));

INSERT INTO #s2_pbn_links(
pbn_nbr
,source_month
,target_month
,source_ent
,target_ent)

SELECT 
p1.pbn_nbr,
source_month = p1.dim_month_key,
target_month = p2.dim_month_key,
source_ent = p1.enterprise_nbr,
target_ent = p2.enterprise_nbr
FROM #s2_load_lbf_fact_pbn_employee_count p1
JOIN #s2_mth_next m
ON p1.dim_month_key = m.dim_month_key
JOIN #s2_load_lbf_fact_pbn_employee_count p2
ON p1.pbn_nbr = p2.pbn_nbr
AND m.mth_next = p2.dim_month_key
WHERE p1.enterprise_nbr <> p2.enterprise_nbr;

/*Find enterprises that satisfy simple repair rules*/
DROP TABLE IF EXISTS #s2_repairs;
CREATE TABLE #s2_repairs(
source_month			int				NOT NULL,
source_ent				char(10)        NOT NULL,
target_ent				char(10)        NOT NULL,
PRIMARY KEY CLUSTERED (source_month, source_ent));

INSERT INTO #s2_repairs(
source_month
,source_ent
,target_ent)

SELECT DISTINCT 
source_month,
source_ent,
target_ent
FROM #s2_pbn_links pbn
JOIN #s2_ent_first_last_ec s
ON pbn.source_ent = s.enterprise_nbr
AND pbn.source_month = s.last_ec_mth  /*Source doesn't employ after transfer*/
JOIN #s2_ent_first_last_ec t
ON pbn.target_ent = t.enterprise_nbr
AND pbn.target_month = t.first_ec_mth /*Target doesn't employ before transfer*/
/*Source doesn't have another target in that month*/
AND NOT EXISTS (SELECT
				1
				FROM #s2_pbn_links ps
				WHERE ps.source_ent = pbn.source_ent
				AND ps.source_month = pbn.source_month
				AND ps.target_ent <> pbn.target_ent)
/*Target doesn't have another source in that month*/
AND NOT EXISTS (SELECT
				1
				FROM #s2_pbn_links pt
				WHERE pt.target_ent = pbn.target_ent
				AND pt.target_month = pbn.target_month
				AND pt.source_ent <> pbn.source_ent);

/*Create map between PENTs and enterprise_nbrs*/
DROP TABLE IF EXISTS #s2_pent;
CREATE TABLE #s2_pent(
pent				char(10)        NOT NULL,
enterprise_nbr		char(10)        NOT NULL,
start_month			int				NOT NULL,
end_month			int				NOT NULL,
PRIMARY KEY CLUSTERED (pent,enterprise_nbr));

DECLARE @s2_first_emp_mth int
SELECT @s2_first_emp_mth = MIN(dim_month_key)
FROM #s2_mth_next;

DECLARE @s2_last_emp_mth int
SELECT @s2_last_emp_mth = MAX(dim_month_key)
FROM #s2_mth_next;

/*Start with first enterprise number in the chain */
INSERT INTO #s2_pent(
pent
,enterprise_nbr
,start_month
,end_month)

SELECT
pent = source_ent,
enterprise_nbr = source_ent,
start_month = @s2_first_emp_mth,
end_month = source_month
FROM #s2_repairs
WHERE source_ent not in (SELECT target_ent FROM #s2_repairs);

/*Loop over subsequent enterprise_nbrs in the chain*/
DECLARE @s2_mth int
SELECT @s2_mth = MIN(source_month) FROM #s2_repairs;

DECLARE @s2_last_mth int
SELECT @s2_last_mth = MAX(source_month) FROM #s2_repairs;

WHILE @s2_mth <= @s2_last_mth
BEGIN
/*Truncate link period end for source enterprise*/
	update #s2_pent
	SET end_month = @s2_mth
	WHERE enterprise_nbr in (SELECT source_ent
								FROM #s2_repairs
								WHERE source_month = @s2_mth)

/*Set target PENT to source PENT*/
	INSERT INTO #s2_pent(
	pent
	,enterprise_nbr
	,start_month
	,end_month)
	SELECT
	pent = pen.pent,
	enterprise_nbr = target_ent,
	start_month = (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @s2_mth) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @s2_mth) + '01'))),
	end_month = @s2_last_emp_mth
	FROM #s2_repairs rep
	JOIN #s2_pent pen
	ON rep.source_ent = pen.enterprise_nbr
	WHERE source_month = @s2_mth
   	SET @s2_mth = (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @s2_mth) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @s2_mth) + '01')))
END;

/*Add single-enterprise_nbr PENTs*/
INSERT INTO #s2_pent(
pent
,enterprise_nbr
,start_month
,end_month)

SELECT
pent = enterprise_nbr,
enterprise_nbr,
start_month = @s2_first_emp_mth,
end_month = @s2_last_emp_mth
FROM #s2_fact_lbf_enterprise_year
WHERE enterprise_nbr not in (SELECT enterprise_nbr FROM #s2_pent)
GROUP BY enterprise_nbr;

DROP TABLE IF EXISTS #s2_pent_IDI;
CREATE TABLE #s2_pent_IDI(
pent				char(10)        NOT NULL,
enterprise_nbr		char(10)        NOT NULL,
start_month			int				NOT NULL,
end_month			int				NOT NULL,
PRIMARY KEY CLUSTERED (pent,enterprise_nbr));

INSERT INTO #s2_pent_IDI(
pent
,enterprise_nbr
,start_month
,end_month)

SELECT 
pent
,enterprise_nbr
,start_month
,end_month 
FROM #s2_pent

drop table IF EXISTS #s2_ent_pbn_repair;
drop table IF EXISTS #s2_temp;
drop table IF EXISTS #s2_load_lbf_fact_pbn_employee_count;
drop table IF EXISTS #s2_ent_first_last_ec;
drop table IF EXISTS #s2_fact_lbf_enterprise_year;
drop table IF EXISTS #s2_mth_next;
drop table IF EXISTS #s2_pbn_links;
drop table IF EXISTS #s2_repairs;
drop table IF EXISTS #s2_pent;

/*Create permanent view of balance month based on predominant heirarchy (total EC > EC mth > active mth > total mths > most recent). This is needed becasue WP filing is annual*/

DROP TABLE IF EXISTS #s3_pent;
CREATE TABLE #s3_pent(
pent				char(10)	NOT NULL,
enterprise_nbr		char(10)	NOT NULL,
start_month			int			NOT NULL,
end_month			int			NOT NULL
PRIMARY KEY CLUSTERED(pent,enterprise_nbr));

INSERT INTO #s3_pent(
pent
,enterprise_nbr
,start_month
,end_month)

SELECT 
pent
,enterprise_nbr
,start_month
,end_month
FROM #s2_pent_IDI;

DROP TABLE IF EXISTS #s3_ent_pbn_repair;
CREATE TABLE #s3_ent_pbn_repair(
snz_uid					int			NOT NULL,
snz_employer_ird_uid	int			NOT NULL,
dim_month_key			int			NOT NULL,
enterprise_nbr			char(10)    NOT NULL,
pbn_nbr					char(10)    NOT NULL
PRIMARY KEY CLUSTERED(snz_uid,snz_employer_ird_uid,dim_month_key));

INSERT INTO #s3_ent_pbn_repair(
snz_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr)

SELECT
snz_uid
,snz_employer_ird_uid
,dim_month_key
,enterprise_nbr
,pbn_nbr
FROM #s1_ent_pbn_repair_IDI r
JOIN [$(idicleanversion)].[security].[concordance] s
ON r.snz_ird_uid = s.snz_ird_uid;

DROP TABLE IF EXISTS #s3_temp;
CREATE TABLE #s3_temp(
enterprise_nbr			char(10)    NOT NULL,
snz_uid					int			NOT NULL,
dim_month_key			int			NOT NULL);

INSERT INTO #s3_temp(
enterprise_nbr
,snz_uid
,dim_month_key)

SELECT
enterprise_nbr = isnull(r.enterprise_nbr,ir_ems_enterprise_nbr),
i.snz_uid,
dim_month_key = year(ir_ems_return_period_date) * 100 + month(ir_ems_return_period_date)
FROM  [$(idicleanversion)].[ir_clean].[ird_ems] i
LEFT JOIN #s3_ent_pbn_repair r
ON i.snz_uid = r.snz_uid
AND i.snz_employer_ird_uid = r.snz_employer_ird_uid
AND year(i.ir_ems_return_period_date) * 100 + month(i.ir_ems_return_period_date) = r.dim_month_key
WHERE i.■■■■■■■■■■■■■■■				
AND ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND isnull(r.enterprise_nbr,ir_ems_enterprise_nbr) IS NOT NULL;

DROP TABLE IF EXISTS #s3_pent_ec_mth;
CREATE TABLE #s3_pent_ec_mth(
pent				char(10)	NOT NULL,
dim_month_key		int			NOT NULL,
ec				int			NOT NULL
PRIMARY KEY CLUSTERED(pent,dim_month_key));

INSERT INTO #s3_pent_ec_mth(
pent
,dim_month_key
,ec)

SELECT
pent
,dim_month_key
,ec = count(*)	
FROM #s3_temp e
JOIN #s3_pent p
ON p.enterprise_nbr = e.enterprise_nbr 
GROUP BY pent,dim_month_key;

DELETE FROM #s3_pent_ec_mth
WHERE dim_month_key > (SELECT max(end_month) FROM #s3_pent)

DROP TABLE IF EXISTS #s3_mth;
CREATE TABLE #s3_mth(
dim_month_key		int			NOT NULL);

INSERT INTO #s3_mth(dim_month_key)

SELECT DISTINCT dim_month_key FROM #s3_pent_ec_mth;

/*Need balance date to assign PAYE activity of WPs to dim_year_key*/

DROP TABLE IF EXISTS #s3_ent_bal_date;
CREATE TABLE #s3_ent_bal_date(
enterprise_nbr			char(10)	NOT NULL,
dim_start_month_key		int			NOT NULL,
dim_end_month_key		int			NOT NULL,
life_cycle_code			char(4)		NOT NULL,
balance_month_nbr		tinyint			NULL
PRIMARY KEY CLUSTERED(enterprise_nbr,dim_start_month_key));

INSERT INTO #s3_ent_bal_date(
enterprise_nbr
,dim_start_month_key
,dim_end_month_key
,life_cycle_code
,balance_month_nbr)

SELECT 
enterprise_nbr
,dim_start_month_key
,dim_end_month_key
,life_cycle_code
,balance_month_nbr
FROM LBD_Clean.lbf_clean.load_lbf_fact_enterprise;

UPDATE #s3_ent_bal_date
SET dim_end_month_key  = (SELECT max(dim_month_key) FROM #s3_pent_ec_mth)
WHERE dim_end_month_key = (SELECT max(dim_end_month_key) FROM #s3_ent_bal_date);

DROP TABLE IF EXISTS #s3_pent_bal_date;
CREATE TABLE #s3_pent_bal_date(
pent					char(10)	NOT NULL,
balance_month_nbr		tinyint		NOT NULL,
method					varchar(7)	NOT NULL
PRIMARY KEY CLUSTERED(pent));

INSERT INTO #s3_pent_bal_date(
pent
,balance_month_nbr
,method)

SELECT
pent,
balance_month_nbr = max(isnull(balance_month_nbr, 0)),
method = 'one'
FROM #s3_pent p
JOIN #s3_ent_bal_date b
ON p.enterprise_nbr = b.enterprise_nbr
AND b.dim_start_month_key <= p.end_month
AND b.dim_end_month_key >= p.start_month
GROUP BY pent
HAVING max(isnull(balance_month_nbr,0)) = min(isnull(balance_month_nbr, 13));

INSERT INTO #s3_pent_bal_date(
pent
,balance_month_nbr
,method)

SELECT
pent,
balance_month_nbr = 3,
method = 'imputed'
FROM #s3_pent p
LEFT JOIN #s3_ent_bal_date b
ON p.enterprise_nbr = b.enterprise_nbr
AND b.dim_start_month_key <= p.end_month
AND b.dim_end_month_key >= p.start_month
GROUP BY pent
HAVING max(isnull(balance_month_nbr,0)) = 0 ;

DROP TABLE IF EXISTS #s3_pent_ent_mth;

CREATE TABLE #s3_pent_ent_mth(
pent					char(10)	NOT NULL,
enterprise_nbr			char(10)	NOT NULL,
dim_month_key			int			NOT NULL
PRIMARY KEY CLUSTERED(pent,dim_month_key));

INSERT INTO #s3_pent_ent_mth(
pent
,enterprise_nbr
,dim_month_key)

SELECT 
pent
,enterprise_nbr
,dim_month_key
FROM #s3_pent p
JOIN #s3_mth m
ON m.dim_month_key BETWEEN p.start_month AND p.end_month
WHERE pent NOT IN(SELECT pent FROM #s3_pent_bal_date);

DROP TABLE IF EXISTS #s3_pent_bal_mth;
CREATE TABLE #s3_pent_bal_mth(
pent					char(10)	NOT NULL,
dim_month_key			int			NOT NULL,
life_cycle_code			char(4)		NOT NULL,
balance_month_nbr		tinyint			NULL,
ec						int			NOT NULL
PRIMARY KEY CLUSTERED(pent,dim_month_key));

INSERT INTO #s3_pent_bal_mth(
pent
,dim_month_key
,life_cycle_code
,balance_month_nbr
,ec)

SELECT 
p.pent
,p.dim_month_key
,life_cycle_code
,balance_month_nbr
,ec = isnull(ec,0)
FROM #s3_pent_ent_mth p
JOIN #s3_ent_bal_date b
ON p.enterprise_nbr = b.enterprise_nbr
AND p.dim_month_key BETWEEN b.dim_start_month_key AND b.dim_end_month_key
LEFT JOIN #s3_pent_ec_mth e
ON p.pent = e.pent
AND p.dim_month_key = e.dim_month_key;

DROP TABLE IF EXISTS #s3_pent_bal_ec;
CREATE TABLE #s3_pent_bal_ec(
pent					char(10)	NOT NULL,
balance_month_nbr		tinyint		NOT NULL,
ec						float		NOT NULL,
n_ec_mths				int			NOT NULL,
n_active_mths			int			NOT NULL,
n_mths					int			NOT NULL,
max_mth					int			NOT NULL
PRIMARY KEY CLUSTERED(pent,balance_month_nbr));

INSERT INTO #s3_pent_bal_ec(
pent
,balance_month_nbr
,ec
,n_ec_mths
,n_active_mths
,n_mths
,max_mth)

SELECT 
pent,balance_month_nbr,
ec			 = sum(ec),
n_ec_mths    = sum(case when ec > 0 then 1 else 0 end),
n_active_mths= sum(case when ec > 0 OR life_cycle_code in ('birt','reac') then 1 else 0 end),
n_mths		 = count(*),
max_mth		 = max(dim_month_key)
FROM #s3_pent_bal_mth
WHERE balance_month_nbr IS NOT NULL 
GROUP BY pent, balance_month_nbr;

INSERT INTO #s3_pent_bal_date(
pent
,balance_month_nbr
,method)

SELECT
i.pent,
balance_month_nbr = max(balance_month_nbr),
method = 'emp'
FROM #s3_pent_bal_ec i
JOIN (SELECT
		pent,
		max_ec = max(ec)
		FROM #s3_pent_bal_ec
		GROUP BY pent) x
ON i.pent = x.pent
AND i.ec = x.max_ec
WHERE i.pent NOT IN(SELECT pent FROM #s3_pent_bal_date)
GROUP BY i.pent
HAVING count(*) = 1;

INSERT INTO #s3_pent_bal_date(
pent
,balance_month_nbr
,method)

SELECT
i.pent,
balance_month_nbr = max(balance_month_nbr),
method = 'emp mth'
FROM #s3_pent_bal_ec i
JOIN (SELECT
		pent,
		max_nmth = max(n_ec_mths)
		FROM #s3_pent_bal_ec
		GROUP BY pent) x
ON i.pent = x.pent
AND i.n_ec_mths = x.max_nmth
WHERE i.pent NOT IN(SELECT pent FROM #s3_pent_bal_date)
GROUP BY i.pent
HAVING count(*) = 1;

INSERT INTO #s3_pent_bal_date(
pent
,balance_month_nbr
,method)

SELECT
i.pent,
balance_month_nbr = max(balance_month_nbr),
method = 'act mth'
FROM #s3_pent_bal_ec i
JOIN (SELECT
		pent,
		max_nmth = max(n_active_mths)
		FROM #s3_pent_bal_ec
		GROUP BY pent) x
ON i.pent = x.pent
AND i.n_active_mths = x.max_nmth
WHERE i.pent NOT IN(SELECT pent FROM #s3_pent_bal_date)
GROUP BY i.pent
HAVING count(*) = 1;

INSERT INTO #s3_pent_bal_date(
pent
,balance_month_nbr
,method)

SELECT
i.pent,
balance_month_nbr,
method = 'last'
FROM #s3_pent_bal_ec i
JOIN (SELECT
		pent,
		max_mth = max(max_mth)  
		FROM #s3_pent_bal_ec
		GROUP BY pent) x
ON i.pent = x.pent
AND i.max_mth = x.max_mth
WHERE i.pent NOT IN(SELECT pent FROM #s3_pent_bal_date);

/*Get balance date, start and end months and count number of EMS months available*/

DROP TABLE IF EXISTS #s3_lbd_bal_year;
CREATE TABLE #s3_lbd_bal_year(
dim_year_key			int			NOT NULL,
bal_month_nbr			tinyint		NOT NULL,
start_month_key			int			NOT NULL,
end_month_key			int			NOT NULL
PRIMARY KEY CLUSTERED(dim_year_key,bal_month_nbr));

INSERT INTO #s3_lbd_bal_year(
dim_year_key
,bal_month_nbr
,start_month_key
,end_month_key)

SELECT 
dim_year_key
,bal_month_nbr
,start_month_key
,end_month_key
FROM LBD_Clean.reference.dim_bal_date_year 
WHERE dim_year_key >= 199503 
AND dim_year_key < 999903;

DROP TABLE IF EXISTS #s3_dim_bal_date_year;
CREATE TABLE #s3_dim_bal_date_year(
dim_year_key			int			NOT NULL,
bal_month_nbr			tinyint		NOT NULL,
bal_start_month_key		int			NOT NULL,
bal_end_month_key		int			NOT NULL,
first_EMS_mth			int			NOT NULL,
last_EMS_mth			int			NOT NULL,
n_EMS_mths				int			NOT NULL
PRIMARY KEY CLUSTERED(dim_year_key,bal_month_nbr));

INSERT INTO #s3_dim_bal_date_year(
dim_year_key
,bal_month_nbr
,bal_start_month_key
,bal_end_month_key
,first_EMS_mth
,last_EMS_mth
,n_EMS_mths)

SELECT 
dim_year_key
,bal_month_nbr
,start_month_key
,end_month_key
,first_EMS_mth = min(dim_month_key)
,last_EMS_mth = max(dim_month_key)
,n_EMS_mths = count(*)
FROM #s3_mth m
JOIN #s3_lbd_bal_year b
ON m.dim_month_key BETWEEN b.start_month_key AND b.end_month_key
GROUP BY dim_year_key,bal_month_nbr,start_month_key,end_month_key;

DROP TABLE IF EXISTS #s3_pent_bal_date_IDI;
CREATE TABLE #s3_pent_bal_date_IDI(
pent					char(10)	NOT NULL,
balance_month_nbr		tinyint		NOT NULL,
method					varchar(7)	NOT NULL
PRIMARY KEY CLUSTERED(pent));

INSERT INTO #s3_pent_bal_date_IDI(
pent
,balance_month_nbr
,method)

SELECT 
pent
,balance_month_nbr
,method 
FROM #s3_pent_bal_date;

DROP TABLE IF EXISTS #s3_dim_bal_date_year_IDI;
CREATE TABLE #s3_dim_bal_date_year_IDI(
dim_year_key			int			NOT NULL,
bal_month_nbr			tinyint		NOT NULL,
bal_start_month_key		int			NOT NULL,
bal_end_month_key		int			NOT NULL,
first_EMS_mth			int			NOT NULL,
last_EMS_extail_mth		int			NOT NULL,
n_EMS_extail_mths		int			NOT NULL
PRIMARY KEY CLUSTERED(dim_year_key,bal_month_nbr));

INSERT INTO #s3_dim_bal_date_year_IDI(
dim_year_key
,bal_month_nbr
,bal_start_month_key
,bal_end_month_key
,first_EMS_mth
,last_EMS_extail_mth
,n_EMS_extail_mths)

SELECT
dim_year_key
,bal_month_nbr
,bal_start_month_key
,bal_end_month_key
,first_EMS_mth
,last_EMS_mth
,n_EMS_mths
FROM #s3_dim_bal_date_year;

/*Identify working proprietors in data*/

DROP TABLE IF EXISTS #s3_individuals;
CREATE TABLE #s3_individuals(
snz_uid			int			NOT NULL,
snz_ird_uid		int			NOT NULL);
	
INSERT INTO #s3_individuals(
snz_uid
,snz_ird_uid)

SELECT DISTINCT
snz_uid
,snz_ird_uid
FROM [$(idicleanversion)].[ir_clean].[ird_ems]
WHERE ir_ems_income_source_code = 'W&S'
AND ir_ems_gross_earnings_amt > 0;
	
INSERT INTO #s3_individuals(
snz_uid
,snz_ird_uid)

SELECT DISTINCT
snz_uid
,snz_ird_uid
FROM [$(idicleanversion)].[ir_clean].[ird_customers]
WHERE ir_cus_entity_type_code = 'I'
AND snz_uid NOT IN(SELECT snz_uid FROM #s3_individuals);

DROP TABLE IF EXISTS #s3_xref_unique;
CREATE TABLE #s3_xref_unique(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent));

/*Insert one-to-one relationships*/
INSERT INTO #s3_xref_unique(
snz_ird_uid
,pent)

SELECT
br_xref_ird_uid,
pent = min(pent)
FROM [$(idicleanversion)].[br_clean].[ird_enterprise_xref] x
JOIN #s3_pent p
ON x.br_xref_enterprise_nbr = p.enterprise_nbr
GROUP BY br_xref_ird_uid
HAVING min(pent) = max(pent);

/*Insert one-to-many relationships*/
DROP TABLE IF EXISTS #s3_xref_multi;
CREATE TABLE #s3_xref_multi(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent));

INSERT INTO #s3_xref_multi(
snz_ird_uid
,pent)

SELECT DISTINCT 
br_xref_ird_uid
,pent
FROM [$(idicleanversion)].[br_clean].[ird_enterprise_xref] x
JOIN #s3_pent p
ON x.br_xref_enterprise_nbr = p.enterprise_nbr
WHERE br_xref_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_unique);

DROP TABLE IF EXISTS #s3_xref_multi_active_gst;
CREATE TABLE #s3_xref_multi_active_gst(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL,
dim_year_key			int				NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent,dim_year_key));

INSERT INTO #s3_xref_multi_active_gst(
snz_ird_uid
,pent
,dim_year_key)

SELECT DISTINCT
snz_ird_uid
,x.pent
,dim_year_key
FROM #s3_xref_multi x
JOIN #s3_pent p
ON x.pent = p.pent
JOIN #s3_pent_bal_date_IDI b
ON x.pent = b.pent
JOIN #s3_lbd_bal_year y
ON b.balance_month_nbr = y.bal_month_nbr
JOIN (SELECT 
		enterprise_nbr
		,dim_month_key
		FROM LBD_Clean.gst_clean.load_gst_return g 
		LEFT JOIN LBD_Clean.lbf_clean.load_lbf_enterprise_ird_link i
		ON g.gst_ird_uid = i.ird_uid
		WHERE gst_tot_sales > 0
		   OR gst_tot_purexp < 0) g
ON p.enterprise_nbr = g.enterprise_nbr
AND g.dim_month_key BETWEEN y.start_month_key AND y.end_month_key
WHERE dim_year_key >= 199503; 

DROP TABLE IF EXISTS #s3_xref_multi_active_ems;
CREATE TABLE #s3_xref_multi_active_ems(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL,
dim_year_key			int				NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent,dim_year_key));

INSERT INTO #s3_xref_multi_active_ems(snz_ird_uid,pent,dim_year_key)
SELECT DISTINCT
x.snz_ird_uid,x.pent,dim_year_key
FROM #s3_xref_multi x
JOIN #s3_pent_bal_date_IDI b
ON x.pent = b.pent
JOIN #s3_lbd_bal_year y
ON b.balance_month_nbr = y.bal_month_nbr
JOIN #s3_pent_ec_mth e
ON x.pent = e.pent
AND e.dim_month_key BETWEEN y.start_month_key AND y.end_month_key;

DROP TABLE IF EXISTS #s3_xref_multi_active_i10;
CREATE TABLE #s3_xref_multi_active_i10(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL,
dim_year_key			int				NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent,dim_year_key));

INSERT INTO #s3_xref_multi_active_i10(
snz_ird_uid
,pent
,dim_year_key)

SELECT DISTINCT
snz_ird_uid
,x.pent
,dim_year_key
FROM #s3_xref_multi x
JOIN #s3_pent p ON x.pent = p.pent
JOIN (
       SELECT 
		i10.enterprise_nbr
		,i10.dim_year_key
		FROM (
		    SELECT 
			dim_year_key,
			enterprise_nbr,
			totinc = isnull(i10p_totinc,0),
			totexp = isnull(i10p_totexp,0)
			FROM [LBD_Clean].[i10_clean].[fact_i10_enterprise_year_pre2013]
			UNION
			SELECT 
			dim_year_key,
			enterprise_nbr,
			totinc = isnull(i10_totinc,0),
			totexp = isnull(i10_totexp,0)
			FROM [LBD_Clean].[i10_clean].[fact_i10_enterprise_year]
			) i10
			WHERE totinc > 0  OR totexp > 0
	 ) i
ON p.enterprise_nbr = i.enterprise_nbr;

INSERT INTO #s3_xref_unique(
snz_ird_uid
,pent)

SELECT
snz_ird_uid
,pent = min(pent)
FROM #s3_xref_multi_active_gst
GROUP BY snz_ird_uid
HAVING min(pent) = max(pent);

INSERT INTO #s3_xref_unique(
snz_ird_uid
,pent)

SELECT
snz_ird_uid
,pent = min(pent)
FROM #s3_xref_multi_active_i10
WHERE snz_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_multi_active_gst)
GROUP BY snz_ird_uid
HAVING min(pent) = max(pent);

INSERT INTO #s3_xref_unique(
snz_ird_uid
,pent)

SELECT
snz_ird_uid
,pent = min(pent)
FROM #s3_xref_multi_active_ems
WHERE snz_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_multi_active_gst)
  AND snz_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_multi_active_i10)
GROUP BY snz_ird_uid
HAVING min(pent) = max(pent);

DROP TABLE IF EXISTS #s3_xref_multi_yr;
CREATE TABLE #s3_xref_multi_yr(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL,
dim_year_key			int				NOT NULL,
active_source			varchar(6)		NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent,dim_year_key));

INSERT INTO #s3_xref_multi_yr(
snz_ird_uid
,pent
,dim_year_key
,active_source)

SELECT 
snz_ird_uid
,pent
,dim_year_key
,active_source = 'GST'
FROM #s3_xref_multi_active_gst
WHERE snz_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_unique);

INSERT INTO #s3_xref_multi_yr(
snz_ird_uid
,pent
,dim_year_key
,active_source)

SELECT 
a.snz_ird_uid
,pent
,a.dim_year_key
,active_source = 'I10'
FROM #s3_xref_multi_active_i10 a
LEFT JOIN (SELECT 
			snz_ird_uid
			,dim_year_key
			FROM #s3_xref_multi_yr) y
ON a.snz_ird_uid = y.snz_ird_uid
AND a.dim_year_key = y.dim_year_key
WHERE a.snz_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_unique)
  AND y.snz_ird_uid IS NULL; 

INSERT INTO #s3_xref_multi_yr(
snz_ird_uid
,pent
,dim_year_key
,active_source)

SELECT 
a.snz_ird_uid
,pent
,a.dim_year_key
,active_source = 'EMS'
FROM #s3_xref_multi_active_ems a
LEFT JOIN (SELECT snz_ird_uid,dim_year_key
			FROM #s3_xref_multi_yr) y
ON a.snz_ird_uid = y.snz_ird_uid
AND a.dim_year_key = y.dim_year_key
WHERE a.snz_ird_uid NOT IN(SELECT snz_ird_uid FROM #s3_xref_unique)
  AND y.snz_ird_uid IS NULL; 

INSERT INTO #s3_xref_multi_yr(
snz_ird_uid
,pent
,dim_year_key
,active_source)

SELECT 
s.snz_ird_uid
,m.pent
,y.dim_year_key
,active_source = 'EXTRAP'
FROM (SELECT
	snz_ird_uid,
	first_yr = min(dim_year_key)
	FROM #s3_xref_multi_yr
	GROUP BY snz_ird_uid) s
JOIN (SELECT DISTINCT dim_year_key
		FROM #s3_xref_multi_yr) y
ON y.dim_year_key < s.first_yr
JOIN #s3_xref_multi_yr m
ON s.snz_ird_uid = m.snz_ird_uid
AND s.first_yr = m.dim_year_key;

INSERT INTO #s3_xref_multi_yr(
snz_ird_uid
,pent
,dim_year_key
,active_source)

SELECT 
s.snz_ird_uid
,m.pent
,y.dim_year_key
,active_source = 'EXTRAP'
FROM (SELECT
	snz_ird_uid,
	last_yr = max(dim_year_key)
	FROM #s3_xref_multi_yr
	GROUP BY snz_ird_uid) s
JOIN (SELECT DISTINCT dim_year_key
		FROM #s3_xref_multi_yr) y
ON y.dim_year_key > s.last_yr
JOIN #s3_xref_multi_yr m
ON s.snz_ird_uid = m.snz_ird_uid
AND s.last_yr = m.dim_year_key;

INSERT INTO #s3_xref_multi_yr(
snz_ird_uid
,pent
,dim_year_key
,active_source)

SELECT 
snz_ird_uid
,pent
,y.dim_year_key
,active_source = 'INTERP' 
FROM (SELECT 
	c.snz_ird_uid
	,c.pent
	,c.dim_year_key
	,next_obs_yr = min(n.dim_year_key)
	,next_obs_match = max(case when c.pent = n.pent then 1 else 0 end)
	FROM #s3_xref_multi_yr c
	JOIN #s3_xref_multi_yr n
	ON c.snz_ird_uid = n.snz_ird_uid
	AND n.dim_year_key > c.dim_year_key
	GROUP BY c.snz_ird_uid,c.pent,c.dim_year_key) x
JOIN (SELECT DISTINCT dim_year_key
		FROM #s3_xref_multi_yr) y
ON  y.dim_year_key > x.dim_year_key
AND y.dim_year_key < x.next_obs_yr
WHERE next_obs_match = 1
ORDER BY y.dim_year_key;

DROP TABLE IF EXISTS #s3_xref_yr;
CREATE TABLE #s3_xref_yr(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL,
dim_year_key			int				NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent,dim_year_key));

INSERT INTO #s3_xref_yr(
snz_ird_uid
,pent
,dim_year_key)

SELECT 
snz_ird_uid
,pent
,dim_year_key
FROM #s3_xref_multi_yr
WHERE dim_year_key <= $(wp_excl);

INSERT INTO #s3_xref_yr(
snz_ird_uid
,pent
,dim_year_key)

SELECT 
snz_ird_uid
,pent
,dim_year_key
FROM #s3_xref_unique x
JOIN (SELECT DISTINCT dim_year_key 
		FROM #s3_xref_yr) y
ON 1 = 1;

DROP TABLE IF EXISTS #s3_pent_wp_yr_nonwage_dup;
CREATE TABLE #s3_pent_wp_yr_nonwage_dup(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
ir_type					varchar(15)		NOT NULL);

INSERT INTO #s3_pent_wp_yr_nonwage_dup(
pent
,snz_uid
,dim_year_key
,ir_type)

SELECT 
pent
,i.snz_uid
,dim_year_key
,ir_type = 'ir3'
FROM #s3_individuals p
JOIN [$(idicleanversion)].[ir_clean].[ird_rtns_keypoints_ir3] i
ON p.snz_uid = i.snz_uid
JOIN #s3_xref_yr x
ON i.snz_ird_uid = x.snz_ird_uid
AND year(i.ir_ir3_return_period_date) * 100 + 3 = x.dim_year_key
WHERE ir_ir3_net_profit_amt <> 0;

INSERT INTO #s3_pent_wp_yr_nonwage_dup(
pent
,snz_uid
,dim_year_key
,ir_type)

SELECT 
pent
,i.snz_uid
,dim_year_key
,ir_type = 'ir7p'
from #s3_individuals p
JOIN [$(idicleanversion)].[ir_clean].[ird_attachments_ir20] i
ON p.snz_uid = i.snz_uid
JOIN [$(idicleanversion)].[ir_clean].[ird_rtns_keypoints_ir3] i2
ON p.snz_uid = i2.snz_uid
AND year(i.ir_ir20_return_period_date) = year(i2.ir_ir3_return_period_date)
JOIN #s3_xref_yr x
ON i.snz_employer_ird_uid = x.snz_ird_uid
AND year(i.ir_ir20_return_period_date) * 100 + 3 = x.dim_year_key
WHERE ir_ir20_tot_share_of_inc_865_amt <> 0
  AND ir_ir3_tot_pship_income_amt <> 0;
  
DROP TABLE IF EXISTS #s3_wp_yr_part_miss;
CREATE TABLE #s3_wp_yr_part_miss(
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
ir_type					varchar(15)		NOT NULL);

INSERT INTO #s3_wp_yr_part_miss(
snz_uid
,dim_year_key
,ir_type)

SELECT DISTINCT
i2.snz_uid,
dim_year_key = year(i2.ir_ir3_return_period_date) * 100 + 3,
ir_type = 'ir7p_missing'
FROM #s3_individuals p
JOIN [$(idicleanversion)].[ir_clean].[ird_rtns_keypoints_ir3] i2
ON p.snz_uid = i2.snz_uid
LEFT JOIN [$(idicleanversion)].[ir_clean].[ird_attachments_ir20] i
ON p.snz_uid = i.snz_uid
AND year(i2.ir_ir3_return_period_date) = year(i.ir_ir20_return_period_date)
WHERE ir_ir20_tot_share_of_inc_865_amt IS NULL
AND ir_ir3_tot_pship_income_amt <> 0
AND year(i2.ir_ir3_return_period_date) * 100 + 3 in (SELECT DISTINCT dim_year_key FROM #s3_xref_yr);

/*Partners where partnership return missing, but IR3 indicates active partner*/
INSERT INTO #s3_pent_wp_yr_nonwage_dup(
pent
,snz_uid
,dim_year_key
,ir_type)

SELECT 
pent = min(pent),
m.snz_uid,
m.dim_year_key,
ir_type = min(m.ir_type)
FROM #s3_wp_yr_part_miss m
JOIN #s3_pent_wp_yr_nonwage_dup p
ON m.snz_uid = p.snz_uid
AND p.dim_year_key BETWEEN m.dim_year_key - 200 AND m.dim_year_key + 200 
WHERE p.ir_type = 'ir7p'
GROUP BY m.snz_uid,m.dim_year_key
HAVING min(pent) = max(pent);

/*Remove IR7 imputation where an IR7 associated with WPs already exists*/
DELETE i
FROM #s3_pent_wp_yr_nonwage_dup i
JOIN (SELECT 
		DISTINCT pent,dim_year_key
		FROM #s3_pent_wp_yr_nonwage_dup
		WHERE ir_type = 'ir7p') r
ON i.pent = r.pent
AND i.dim_year_key = r.dim_year_key
WHERE i.ir_type = 'ir7p_missing';

/*"Shareholders salary" may be paid to non-business owners impose threshold to exclude large number receiving low income. Threshold is set at $15,000 (2000 dollars) - see Fabling & Mare labour paper for motivation*/

DROP TABLE IF EXISTS #s3_threshold;
CREATE TABLE #s3_threshold(
tax_year		int					NOT NULL,
threshold		decimal(13,2)		NOT NULL
PRIMARY KEY CLUSTERED(tax_year));

INSERT INTO #s3_threshold(tax_year,threshold) values(1995,13890.11);
INSERT INTO #s3_threshold(tax_year,threshold) values(1996,14357.14);
INSERT INTO #s3_threshold(tax_year,threshold) values(1997,14664.84);
INSERT INTO #s3_threshold(tax_year,threshold) values(1998,14824.18);
INSERT INTO #s3_threshold(tax_year,threshold) values(1999,14956.04);
INSERT INTO #s3_threshold(tax_year,threshold) values(2000,15000.00);
INSERT INTO #s3_threshold(tax_year,threshold) values(2001,15450.55);
INSERT INTO #s3_threshold(tax_year,threshold) values(2002,15846.15);
INSERT INTO #s3_threshold(tax_year,threshold) values(2003,16258.24);
INSERT INTO #s3_threshold(tax_year,threshold) values(2004,16505.49);
INSERT INTO #s3_threshold(tax_year,threshold) values(2005,16934.07);
INSERT INTO #s3_threshold(tax_year,threshold) values(2006,17467.03);
INSERT INTO #s3_threshold(tax_year,threshold) values(2007,18027.47);
INSERT INTO #s3_threshold(tax_year,threshold) values(2008,18494.51);
INSERT INTO #s3_threshold(tax_year,threshold) values(2009,19197.80);
INSERT INTO #s3_threshold(tax_year,threshold) values(2010,19571.43);
INSERT INTO #s3_threshold(tax_year,threshold) values(2011,20131.87);
INSERT INTO #s3_threshold(tax_year,threshold) values(2012,20802.20);
INSERT INTO #s3_threshold(tax_year,threshold) values(2013,20989.01);
INSERT INTO #s3_threshold(tax_year,threshold) values(2014,21252.75);
INSERT INTO #s3_threshold(tax_year,threshold) values(2015,21450.55);
INSERT INTO #s3_threshold(tax_year,threshold) values(2016,21521.98);
INSERT INTO #s3_threshold(tax_year,threshold) values(2017,21752.75);
INSERT INTO #s3_threshold(tax_year,threshold) values(2018,22098.90);
INSERT INTO #s3_threshold(tax_year,threshold) values(2019,22472.53);
INSERT INTO #s3_threshold(tax_year,threshold) values(2020,22895.60);
INSERT INTO #s3_threshold(tax_year,threshold) values(2021,23230.77);
INSERT INTO #s3_threshold(tax_year,threshold) values(2022,24461.54);
INSERT INTO #s3_threshold(tax_year,threshold) values(2023,26197.80);
INSERT INTO #s3_threshold(tax_year,threshold) values(2024,27527.47);

DROP TABLE IF EXISTS #s3_ever_threshold;
CREATE TABLE #s3_ever_threshold(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL
PRIMARY KEY CLUSTERED(pent,snz_uid));

INSERT INTO #s3_ever_threshold(pent,snz_uid)

SELECT DISTINCT
pent
,snz_uid
FROM (SELECT 
	pent
	,i.snz_uid
	,tax_year
	FROM #s3_individuals p
	JOIN [$(idicleanversion)].[ir_clean].[ird_attachments_ir4s] i
	ON p.snz_uid = i.snz_uid
	JOIN #s3_xref_yr x
	ON i.snz_employer_ird_uid = x.snz_ird_uid
	AND year(i.ir_ir4_return_period_date) * 100 + 3 = x.dim_year_key
	JOIN #s3_threshold t
	ON year(i.ir_ir4_return_period_date) = t.tax_year
	GROUP BY pent,i.snz_uid,tax_year
	HAVING sum(i.ir_ir4_tot_sholder_sal_809_amt) > max(threshold)) e;

INSERT INTO #s3_pent_wp_yr_nonwage_dup(
pent
,snz_uid
,dim_year_key
,ir_type)

SELECT 
x.pent
,i.snz_uid
,dim_year_key,
ir_type = 'ir4s'
FROM #s3_individuals p
JOIN [$(idicleanversion)].[ir_clean].[ird_attachments_ir4s] i
ON p.snz_uid = i.snz_uid
JOIN #s3_xref_yr x
ON i.snz_employer_ird_uid = x.snz_ird_uid
AND year(i.ir_ir4_return_period_date) * 100 + 3 = x.dim_year_key
JOIN #s3_ever_threshold t
ON i.snz_uid = t.snz_uid
AND x.pent = t.pent;

/*Remove IR7 imputation where a non-IR7 annual filing associated with WPs already exists*/

DELETE i
FROM #s3_pent_wp_yr_nonwage_dup i
JOIN (SELECT 
		DISTINCT pent,dim_year_key
		FROM #s3_pent_wp_yr_nonwage_dup
		WHERE ir_type <> 'ir7p_missing') r
ON i.pent = r.pent
AND i.dim_year_key = r.dim_year_key
WHERE i.ir_type = 'ir7p_missing';

DROP TABLE IF EXISTS #s3_pent_wp_yr_nonwage;
CREATE TABLE #s3_pent_wp_yr_nonwage(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
flag_ir3				tinyint			NOT NULL,
flag_ir7p				tinyint			NOT NULL,
flag_ir7p_imp			tinyint			NOT NULL,
flag_ir4s				tinyint			NOT NULL
PRIMARY KEY CLUSTERED(pent,snz_uid,dim_year_key));

INSERT INTO #s3_pent_wp_yr_nonwage(
pent
,snz_uid
,dim_year_key
,flag_ir3
,flag_ir7p
,flag_ir7p_imp
,flag_ir4s)

SELECT 
pent
,snz_uid
,dim_year_key
,flag_ir3	 = max(iif(ir_type = 'ir3',1,0))
,flag_ir7p	 = max(iif(ir_type = 'ir7p',1,0))
,flag_ir7p_imp = max(iif(ir_type = 'ir7p_missing',1,0))
,flag_ir4s	 = max(iif(ir_type = 'ir4s',1,0))
FROM #s3_pent_wp_yr_nonwage_dup 
GROUP BY pent,snz_uid,dim_year_key;

/*Working proprietor*/
DROP TABLE IF EXISTS #s3_xref_noyr;
CREATE TABLE #s3_xref_noyr(
snz_ird_uid				int				NOT NULL,
pent					char(10)		NOT NULL
PRIMARY KEY CLUSTERED(snz_ird_uid,pent));

INSERT INTO #s3_xref_noyr(
snz_ird_uid
,pent)

SELECT DISTINCT 
snz_ird_uid
,pent
FROM #s3_xref_yr

DROP TABLE IF EXISTS #s3_pent_wp;
CREATE TABLE #s3_pent_wp(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL
PRIMARY KEY CLUSTERED(pent,snz_uid));

/*Sole proprietors (IR number associated with an individual and with an enterprise_nbr)*/
INSERT INTO #s3_pent_wp(
pent
,snz_uid)

SELECT 
pent
,i.snz_uid
FROM #s3_individuals i
JOIN #s3_xref_noyr x
ON i.snz_ird_uid = x.snz_ird_uid;

/*From annual filing (IR number receiving self-employed income)*/
INSERT INTO #s3_pent_wp(
pent
,snz_uid)

SELECT DISTINCT
n.pent
,n.snz_uid
FROM #s3_pent_wp_yr_nonwage n
LEFT JOIN #s3_pent_wp w
ON n.pent = w.pent
AND n.snz_uid = w.snz_uid
WHERE w.pent IS NULL; 

/*Get W&S information at the pent level for self-employed*/
DROP TABLE IF EXISTS #s3_ent_wp;
CREATE TABLE #s3_ent_wp(
pent					char(10)		NOT NULL,
enterprise_nbr			char(10)		NOT NULL,
snz_uid					int				NOT NULL
PRIMARY KEY CLUSTERED(enterprise_nbr,snz_uid));

INSERT INTO #s3_ent_wp(
pent
,enterprise_nbr
,snz_uid)

SELECT 
w.pent
,enterprise_nbr
,snz_uid
FROM #s3_pent_wp w
JOIN #s3_pent p
ON w.pent = p.pent;

DROP TABLE IF EXISTS #s3_pent_emp_mth;
CREATE TABLE #s3_pent_emp_mth(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_month_key			int				NOT NULL);

INSERT INTO #s3_pent_emp_mth(
pent
,snz_uid
,dim_month_key)

SELECT 
pent
,w.snz_uid
,dim_month_key
FROM #s3_ent_wp w
JOIN #s3_temp e
ON w.enterprise_nbr = e.enterprise_nbr
AND w.snz_uid = e.snz_uid;

DROP TABLE IF EXISTS #s3_pent_wp_yr_wage;
CREATE TABLE #s3_pent_wp_yr_wage(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL
PRIMARY KEY CLUSTERED(pent,snz_uid,dim_year_key));

INSERT INTO #s3_pent_wp_yr_wage(
pent
,snz_uid
,dim_year_key)

SELECT DISTINCT
e.pent,
e.snz_uid,
dim_year_key
FROM #s3_pent_emp_mth e
JOIN #s3_pent_bal_date_IDI b
ON e.pent = b.pent
JOIN #s3_dim_bal_date_year_IDI d
ON b.balance_month_nbr = d.bal_month_nbr
AND e.dim_month_key BETWEEN d.bal_start_month_key AND d.bal_end_month_key;

DROP TABLE IF EXISTS #s3_pent_wp_yr;
CREATE TABLE #s3_pent_wp_yr(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
WP_ems					tinyint			NOT NULL,
WP_ir3					tinyint			NOT NULL,
WP_ir7p					tinyint			NOT NULL,
WP_ir7p_imp				tinyint			NOT NULL,
WP_ir4s					tinyint			NOT NULL
PRIMARY KEY CLUSTERED(pent,snz_uid,dim_year_key));

INSERT INTO #s3_pent_wp_yr(
pent
,snz_uid
,dim_year_key
,WP_ems
,WP_ir3
,WP_ir7p
,WP_ir7p_imp
,WP_ir4s)

SELECT
pent = isnull(n.pent,w.pent),
snz_uid = isnull(n.snz_uid,w.snz_uid),
dim_year_key = isnull(n.dim_year_key,w.dim_year_key),
WP_ems = iif(w.dim_year_key IS NULL,0,1),
WP_ir3 = isnull(flag_ir3,0)
, WP_ir7p = isnull(flag_ir7p,0)
, WP_ir7p_imp = isnull(flag_ir7p_imp,0)
, WP_ir4s = isnull(flag_ir4s,0)
FROM #s3_pent_wp_yr_nonwage n
full JOIN #s3_pent_wp_yr_wage w
ON n.pent = w.pent
AND n.snz_uid = w.snz_uid
AND n.dim_year_key = w.dim_year_key
WHERE isnull(n.dim_year_key,w.dim_year_key) >= 200003; /*Truncate non-wage to EMS period*/

/*Remove tail only covered by EMS*/
DELETE FROM #s3_pent_wp_yr 
WHERE dim_year_key > $(wp_excl);

DROP TABLE IF EXISTS #s3_wp_yr;
CREATE TABLE #s3_wp_yr(
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
n_businesses			tinyint			NOT NULL
PRIMARY KEY CLUSTERED(snz_uid,dim_year_key));

INSERT INTO #s3_wp_yr(
snz_uid
,dim_year_key
,n_businesses)

SELECT
snz_uid,
dim_year_key,
n_businesses = count(*)
FROM #s3_pent_wp_yr
GROUP BY snz_uid,dim_year_key;

/*WP indicator for working proprietors for prior and following year*/

DROP TABLE IF EXISTS #s3_pent_WP_yr_adjacent;
CREATE TABLE #s3_pent_WP_yr_adjacent(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
WP_ems					tinyint			NOT NULL,
WP_ir3					tinyint			NOT NULL,
WP_ir7p					tinyint			NOT NULL,
WP_ir7p_imp				tinyint			NOT NULL,
WP_ir4s					tinyint			NOT NULL,
WP_prior_year			tinyint				NULL,
WP_next_year			tinyint				NULL
PRIMARY KEY CLUSTERED(pent,snz_uid,dim_year_key));

INSERT INTO #s3_pent_WP_yr_adjacent(
pent
,snz_uid
,dim_year_key
,WP_ems
,WP_ir3
,WP_ir7p
,WP_ir7p_imp
,WP_ir4s
,WP_prior_year
,WP_next_year)

SELECT 
c.pent
,c.snz_uid
,c.dim_year_key
,c.WP_ems
,c.WP_ir3
,c.WP_ir7p
,c.WP_ir7p_imp
,c.WP_ir4s
,WP_prior_year = case when p.dim_year_key IS NOT NULL then 1
				   when c.dim_year_key = 200003	   then null	
												   else 0 end
,WP_next_year = case when n.dim_year_key IS NOT NULL then 1
				   when c.dim_year_key >= $(current_year)03   then null	
												   else 0 end
FROM #s3_pent_wp_yr c
LEFT JOIN #s3_pent_wp_yr p	
ON c.pent = p.pent
AND c.snz_uid = p.snz_uid
AND c.dim_year_key = p.dim_year_key + 100	
LEFT JOIN #s3_pent_wp_yr n	
ON c.pent = n.pent
AND c.snz_uid = n.snz_uid
AND c.dim_year_key = n.dim_year_key - 100;	

DROP TABLE IF EXISTS #s3_pent_WP_yr_IDI;
CREATE TABLE #s3_pent_WP_yr_IDI(
pent					char(10)		NOT NULL,
snz_uid					int				NOT NULL,
dim_year_key			int				NOT NULL,
has_ems_inc				tinyint			NOT NULL,
has_ir3_inc				tinyint			NOT NULL,
has_ir7p_inc			tinyint			NOT NULL,
has_ir7p_imp_inc		tinyint			NOT NULL,
has_ir4s_inc			tinyint			NOT NULL,
multi_WP				tinyint			NOT NULL,
WP_prior_year			tinyint				NULL,
WP_next_year			tinyint				NULL,
adj_WP_count			float				NULL
PRIMARY KEY CLUSTERED(pent,snz_uid,dim_year_key));

INSERT INTO #s3_pent_WP_yr_IDI(
pent
,snz_uid
,dim_year_key
,has_ems_inc
,has_ir3_inc
,has_ir7p_inc
,has_ir7p_imp_inc
,has_ir4s_inc
,multi_WP
,WP_prior_year
,WP_next_year
,adj_WP_count)

SELECT
pent
,p.snz_uid
,p.dim_year_key
,WP_ems
,WP_ir3
,WP_ir7p
,WP_ir7p_imp
,WP_ir4s
,multi_WP = case when n_businesses > 1 then 1 else 0 end,
WP_prior_year,
WP_next_year,
adj_WP_count = case when n_businesses > 1					 then 1.0 / n_businesses	/*Multi-business owner (includes same-year transitions)*/
				  when WP_prior_year = 0 OR WP_next_year = 0 then 0.5				/*Known entry/exit*/
				  when WP_prior_year * WP_next_year = 1		 then 1.0				/*Known continuity*/
														 else null end			/*Adjustment not possible*/
FROM #s3_pent_wp_yr_adjacent p
JOIN #s3_wp_yr w
ON p.snz_uid = w.snz_uid
AND p.dim_year_key = w.dim_year_key;

drop table IF EXISTS #s3_pent
drop table IF EXISTS #s3_ent_pbn_repair
drop table IF EXISTS #s3_temp
drop table IF EXISTS #s3_pent_ec_mth
drop table IF EXISTS #s3_mth
drop table IF EXISTS #s3_ent_bal_date
drop table IF EXISTS #s3_pent_bal_date
drop table IF EXISTS #s3_pent_ent_mth
drop table IF EXISTS #s3_pent_bal_mth
drop table IF EXISTS #s3_pent_bal_ec
drop table IF EXISTS #s3_lbd_bal_year
drop table IF EXISTS #s3_dim_bal_date_year
drop table IF EXISTS #s3_pent_bal_date_IDI 
drop table IF EXISTS #s3_dim_bal_date_year_IDI 

/* Create a table with all months in the pent_IDI table for later use in optimising code with a loop and also to use with one-month_gap adjustments in spells */

DROP TABLE IF EXISTS #pent;
CREATE TABLE #pent (
pent			char(10)	NOT NULL,
enterprise_nbr	char(10)	NOT NULL,
start_month		int			NOT NULL,
end_month		int			NOT NULL
PRIMARY KEY CLUSTERED (pent,enterprise_nbr));

INSERT INTO #pent (
pent
,enterprise_nbr
,start_month
,end_month)

SELECT 
pent
,enterprise_nbr
,start_month
,end_month
FROM #s2_pent_IDI;

DROP TABLE IF EXISTS #mths;
CREATE TABLE #mths(
dim_month_key	int		NOT NULL,
m_day			int		NOT NULL,
m_mth			int		NOT NULL,
m_year			int		NOT NULL,
PRIMARY KEY (dim_month_key));

INSERT INTO #mths (
dim_month_key,
m_day,
m_mth,
m_year)

SELECT DISTINCT
dim_month_key = start_month 
,m_day = 15
,m_mth = start_month - FLOOR(start_month / 100) * 100
,m_year = FLOOR(start_month / 100)
FROM #pent; 

DROP TABLE IF EXISTS #cm_ent_emp_mth;
CREATE TABLE #cm_ent_emp_mth (
	enterprise_nbr			char(10)		NOT NULL,
	pbn_nbr					char(10)			NULL,
	snz_uid					int				NOT NULL,
	snz_ird_uid				int				NOT NULL,
	snz_employer_ird_uid	int				NOT NULL,
	dim_month_key			int				NOT NULL,
	gross_earn				decimal(13,2)	NOT NULL,
	employee_start_date		date				NULL,
	employee_end_date		date				NULL);

INSERT INTO #cm_ent_emp_mth (
enterprise_nbr
,pbn_nbr
,snz_uid
,snz_ird_uid
,snz_employer_ird_uid
,dim_month_key
,gross_earn
,employee_start_date
,employee_end_date)

SELECT
enterprise_nbr = ISNULL(i.[ir_ems_enterprise_nbr],inc_pbn_enterprise_nbr)
,pbn_nbr = ISNULL(i.[ir_ems_pbn_nbr],inc_pbn_pbn_nbr)
,i.snz_uid
,i.snz_ird_uid
,i.snz_employer_ird_uid
,dim_month_key = YEAR(ir_ems_return_period_date) * 100 + MONTH(ir_ems_return_period_date)
,gross_earn = ISNULL(ir_ems_gross_earnings_amt,0)
,employee_start_date = ir_ems_employee_start_date
,employee_end_date = ir_ems_employee_end_date
FROM  [$(idicleanversion)].ir_clean.ird_ems i
LEFT JOIN [$(idicleanversion)].[data].[income_pbn_ent] r
ON i.snz_ird_uid = r.snz_ird_uid
AND i.snz_employer_ird_uid = r.snz_employer_ird_uid
AND YEAR(i.ir_ems_return_period_date) * 100 + MONTH(i.ir_ems_return_period_date) = r.inc_pbn_dim_month_key
WHERE i.■■■■■■■■■■■■■■■				
AND ir_ems_income_source_code = 'W&S'	
AND ir_ems_gross_earnings_amt > 0
AND ir_ems_snz_unique_nbr = 1
AND ISNULL(i.[ir_ems_enterprise_nbr],r.[inc_pbn_enterprise_nbr]) is not null;

DELETE FROM #cm_ent_emp_mth
WHERE dim_month_key NOT IN (
SELECT
dim_month_key
FROM #mths
);

CREATE CLUSTERED INDEX i ON #cm_ent_emp_mth  (dim_month_key);

DROP TABLE IF EXISTS #cm_ent_ind_emp_mth;
CREATE TABLE #cm_ent_ind_emp_mth ( 
    pent                    char(10)		NOT NULL,
	enterprise_nbr   		char(10)		NOT NULL,
	snz_uid					int				NOT NULL,
	snz_ird_uid				int				NOT NULL,
	snz_employer_ird_uid    int             NOT NULL,
	uid_ent_index			varchar(18)		NOT NULL,
	dim_month_key			int				NOT NULL,
	min_pbn_nbr				char(10)		NOT NULL,
	max_pbn_nbr				char(10)		NOT NULL,
	gross_earn				decimal(13,2)	NOT NULL); 

DECLARE @mth int
SELECT @mth = MIN(dim_month_key) FROM #mths

DECLARE @last_mth int
SELECT @last_mth = MAX(dim_month_key) FROM #mths

WHILE @mth <= @last_mth
BEGIN
	INSERT INTO #cm_ent_ind_emp_mth (
	pent,
	enterprise_nbr,
	snz_uid,
	snz_ird_uid,
	snz_employer_ird_uid,
	uid_ent_index,
	dim_month_key,
	min_pbn_nbr,
	max_pbn_nbr,
	gross_earn)

	SELECT 
	p.pent
	,e.enterprise_nbr
	,snz_uid
	,snz_ird_uid
	,snz_employer_ird_uid
	,uid_ent_index = MAX(CONCAT(snz_uid,e.enterprise_nbr))
	,dim_month_key = MAX(dim_month_key)
	,min_pbn_nbr = MIN(pbn_nbr)
	,max_pbn_nbr = MAX(pbn_nbr)
	,gross_earn = SUM(gross_earn)
	FROM #pent p
	JOIN #cm_ent_emp_mth e ON p.enterprise_nbr = e.enterprise_nbr
	WHERE dim_month_key = @mth 
	GROUP BY 
	pent,
	e.enterprise_nbr,
	snz_uid,
	snz_ird_uid,
	snz_employer_ird_uid

	SET @mth = (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @mth) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, @mth) + '01')))
	END;
	
DELETE FROM #cm_ent_ind_emp_mth
WHERE gross_earn < 1;

DROP TABLE IF EXISTS #cm_ent_emp_mth;

/*-----------------------------------------------------------------------------------------------
Get Job Spells
-----------------------------------------------------------------------------------------------*/

/* Identify first and last months and the count of months for the individual-enterprise combination */
DROP TABLE IF EXISTS #cm_enterprise_nbr_emp_span;
CREATE TABLE #cm_enterprise_nbr_emp_span (
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
first_month    int NOT NULL,
last_month     int NOT NULL,
n_month        int NOT NULL
PRIMARY KEY CLUSTERED (pent, enterprise_nbr, snz_uid, uid_ent_index, first_month, last_month, n_month));

INSERT INTO #cm_enterprise_nbr_emp_span (
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,first_month
,last_month
,n_month)

SELECT
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,first_month = MIN(dim_month_key)
,last_month = MAX(dim_month_key)
,n_month = COUNT(*)
FROM #cm_ent_ind_emp_mth
GROUP BY 
pent
,enterprise_nbr
,snz_uid
,uid_ent_index;

/* Identify gaps  in employment spans by comparing the number of months between the start and end dates to the number of months we have employment data for for that individual-enterprise combination */
DROP TABLE IF EXISTS #cm_ent_nbr_emp_with_gaps;
CREATE TABLE #cm_ent_nbr_emp_with_gaps ( 
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
first_month    int NOT NULL,
last_month     int NOT NULL,
n_month        int NOT NULL
PRIMARY KEY CLUSTERED (pent, enterprise_nbr, snz_uid, uid_ent_index));

INSERT INTO #cm_ent_nbr_emp_with_gaps ( 
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,first_month
,last_month
,n_month)

SELECT
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,first_month
,last_month
,n_month
FROM #cm_enterprise_nbr_emp_span 
WHERE DATEDIFF(MONTH, CONVERT(DATETIME, CONVERT(VARCHAR, first_month) + '01'), CONVERT(DATETIME, CONVERT(VARCHAR, last_month)   + '01')) + 1 > n_month + 1;

/* Employing months in spans with potential gaps */
DROP TABLE IF EXISTS #cm_ent_nbr_emp_wpg; 
CREATE TABLE #cm_ent_nbr_emp_wpg ( 
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
dim_month_key  int NOT NULL);

INSERT INTO #cm_ent_nbr_emp_wpg ( 
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,dim_month_key)

SELECT
m.pent
,m.enterprise_nbr
,m.snz_uid
,m.uid_ent_index
,dim_month_key
FROM #cm_ent_ind_emp_mth m
JOIN #cm_ent_nbr_emp_with_gaps g ON m.uid_ent_index = g.uid_ent_index;

DROP TABLE IF EXISTS #cm_ent_nbr_emp_with_gaps;

DROP TABLE IF EXISTS #cm_two_month;
CREATE TABLE #cm_two_month (
dim_month_key    int NOT NULL,
dim_month_key_f1 int NOT NULL,
dim_month_key_f2 int NOT NULL
PRIMARY KEY CLUSTERED (dim_month_key, dim_month_key_f1, dim_month_key_f2));

INSERT INTO #cm_two_month (
dim_month_key
,dim_month_key_f1
,dim_month_key_f2)

/* Get each year month plus the two months following the last year month in the mths table */
SELECT 
dim_month_key = (DATEPART(YEAR, (CONVERT(VARCHAR, dim_month_key) + '01')) * 100) + DATEPART(MONTH, (CONVERT(VARCHAR, dim_month_key) + '01'))
,dim_month_key_f1 = (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, dim_month_key) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, dim_month_key) + '01')))
,dim_month_key_f2 = (DATEPART(YEAR, DATEADD(MONTH, 2, (CONVERT(VARCHAR, dim_month_key) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 2, (CONVERT(VARCHAR, dim_month_key) + '01')))
FROM #mths;

INSERT INTO #cm_two_month ( 
dim_month_key
,dim_month_key_f1
,dim_month_key_f2)

/* Add in the two months preceding the earliest year month in the mths table */
SELECT 
dim_month_key = (DATEPART(YEAR, (DATEADD(MONTH, -2, (CONVERT(VARCHAR, dim_month_key) + '01')))) * 100) + DATEPART(MONTH, (DATEADD(MONTH, -2, (CONVERT(VARCHAR, dim_month_key) + '01'))))
,dim_month_key_f1 = (DATEPART(YEAR, (DATEADD(MONTH, -1, (CONVERT(VARCHAR, dim_month_key) + '01')))) * 100) + DATEPART(MONTH, (DATEADD(MONTH, -1, (CONVERT(VARCHAR, dim_month_key) + '01'))))
,dim_month_key_f2 = (DATEPART(YEAR, (CONVERT(VARCHAR, dim_month_key) + '01')) * 100) + DATEPART(MONTH, (CONVERT(VARCHAR, dim_month_key) + '01'))
FROM #mths
WHERE dim_month_key BETWEEN 
(SELECT MIN(dim_month_key) FROM #mths) 
AND (SELECT (DATEPART(YEAR, DATEADD(MONTH, 1, (CONVERT(VARCHAR, MIN(dim_month_key)) + '01'))) * 100) + DATEPART(MONTH, DATEADD(MONTH, 1, (CONVERT(VARCHAR, MIN(dim_month_key)) + '01'))) FROM #mths);

/* Spell ends */
DROP TABLE IF EXISTS #cm_spell_end; 
CREATE TABLE #cm_spell_end ( 
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
dim_month_key  int NOT NULL);

/* Add in spells ends for observations with potential gaps (Identifies cases where the following two months are not months that have potential gaps - these are the spell ends) */
INSERT INTO #cm_spell_end ( 
pent,
enterprise_nbr,
snz_uid, 
uid_ent_index, 
dim_month_key)

SELECT
wpg.pent
,wpg.enterprise_nbr
,wpg.snz_uid
,wpg.uid_ent_index
,wpg.dim_month_key
FROM #cm_ent_nbr_emp_wpg wpg
JOIN #cm_two_month tm ON wpg.dim_month_key = tm.dim_month_key
LEFT JOIN #cm_ent_nbr_emp_wpg f1 ON wpg.uid_ent_index = f1.uid_ent_index AND tm.dim_month_key_f1 = f1.dim_month_key
LEFT JOIN #cm_ent_nbr_emp_wpg f2 ON wpg.uid_ent_index = f2.uid_ent_index AND tm.dim_month_key_f2 = f2.dim_month_key
WHERE f1.dim_month_key IS NULL
AND f2.dim_month_key IS NULL;

/* Add in spell ends for observatons without gaps by finding the last month of the employment span for each individual-enterprise combination where they do not appear in the above dataset for spell ends that had potential gaps */
INSERT INTO #cm_spell_end (
pent,
enterprise_nbr,
snz_uid, 
uid_ent_index, 
dim_month_key)

SELECT
esp.pent
,esp.enterprise_nbr
,esp.snz_uid
,esp.uid_ent_index
,dim_month_key = esp.last_month 
FROM #cm_enterprise_nbr_emp_span esp 
WHERE uid_ent_index NOT IN (
	SELECT
	uid_ent_index 
	FROM #cm_spell_end 
	);

/* Spells starts */
DROP TABLE IF EXISTS #cm_spell_start;
CREATE TABLE #cm_spell_start ( 
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
dim_month_key  int NOT NULL);

/* Add spell starts for observatios with potential gaps (Identifies cases where the preceding two months are not months that have potential gaps - these are the spell starts) */
INSERT INTO #cm_spell_start ( 
pent,
enterprise_nbr,
snz_uid, 
uid_ent_index, 
dim_month_key)

SELECT
wpg.pent
,wpg.enterprise_nbr
,wpg.snz_uid
,wpg.uid_ent_index
,wpg.dim_month_key
FROM #cm_ent_nbr_emp_wpg wpg
JOIN #cm_two_month tm ON wpg.dim_month_key = tm.dim_month_key_f2
LEFT JOIN #cm_ent_nbr_emp_wpg l1 ON wpg.uid_ent_index = l1.uid_ent_index AND tm.dim_month_key_f1 = l1.dim_month_key
LEFT JOIN #cm_ent_nbr_emp_wpg l2 ON wpg.uid_ent_index = l2.uid_ent_index AND tm.dim_month_key = l2.dim_month_key
WHERE l1.dim_month_key IS NULL
AND l2.dim_month_key IS NULL;

/* Add spell starts for observations without gaps by finding the first month of the employment span for each individual-enterprise combination where they do not appear in the above dataset for spell starts with potential gaps */
INSERT INTO #cm_spell_start ( 
pent,
enterprise_nbr,
snz_uid, 
uid_ent_index, 
dim_month_key)

SELECT
esp.pent
,esp.enterprise_nbr
,esp.snz_uid
,esp.uid_ent_index
,esp.first_month as dim_month_key
FROM #cm_enterprise_nbr_emp_span esp 
WHERE esp.uid_ent_index NOT IN (
	SELECT
	uid_ent_index 
	FROM #cm_spell_start 
	);

DROP TABLE IF EXISTS #cm_ent_nbr_emp_wpg;
DROP TABLE IF EXISTS #cm_enterprise_nbr_emp_span;

DROP TABLE IF EXISTS #cm_spell_pooled; 
CREATE TABLE #cm_spell_pooled ( 
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
dim_month_key  int NOT NULL,
spell_start    tinyint NOT NULL,
spell_end      tinyint NOT NULL);

INSERT INTO #cm_spell_pooled (
pent,
enterprise_nbr,
snz_uid, 
uid_ent_index, 
dim_month_key,
spell_start,
spell_end)

SELECT
pent = ISNULL(s.pent, e.pent)
,enterprise_nbr = ISNULL(s.enterprise_nbr, e.enterprise_nbr)
,snz_uid = ISNULL(s.snz_uid, e.snz_uid)
,uid_ent_index = ISNULL(s.uid_ent_index, e.uid_ent_index)
,dim_month_key = ISNULL(s.dim_month_key, e.dim_month_key)
,spell_start = CASE WHEN s.enterprise_nbr is null THEN 0 ELSE 1 END
,spell_end = CASE WHEN e.enterprise_nbr is null THEN 0 ELSE 1 END
FROM #cm_spell_start s
FULL JOIN #cm_spell_end e 
ON s.uid_ent_index = e.uid_ent_index 
AND s.dim_month_key = e.dim_month_key;

DROP TABLE IF EXISTS #cm_spell_start;
DROP TABLE IF EXISTS #cm_spell_end;

/* Job spells */
DROP TABLE IF EXISTS #cm_job_spells;
CREATE TABLE #cm_job_spells (
pent           char(10) NOT NULL,
enterprise_nbr char(10) NOT NULL,
snz_uid        int NOT NULL,
uid_ent_index  varchar(18) NOT NULL,
dim_month_key  int NOT NULL,
spell_start    tinyint NOT NULL,
spell_end      tinyint NOT NULL);

INSERT INTO #cm_job_spells (
pent,
enterprise_nbr,
snz_uid,
uid_ent_index,
dim_month_key,
spell_start,
spell_end
)

SELECT DISTINCT
e.pent
,e.enterprise_nbr
,e.snz_uid
,e.uid_ent_index
,e.dim_month_key
,spell_start = ISNULL(spell_start, 0)
,spell_end = ISNULL(spell_end, 0)
FROM #cm_ent_ind_emp_mth e
LEFT JOIN #cm_spell_pooled s 
ON e.uid_ent_index = s.uid_ent_index 
AND e.dim_month_key = s.dim_month_key;

DROP TABLE IF EXISTS #cm_spell_pooled;

/*-----------------------------------------------------------------------------------------------
Add in Joint filer flag
-----------------------------------------------------------------------------------------------*/
/* List the distinct Employer-PENT relationships for each month */
DROP TABLE IF EXISTS #ems_payer_pent_mth;
CREATE TABLE #ems_payer_pent_mth (
	snz_employer_ird_uid	int			NOT NULL,
	pent					char(10)	NOT NULL,
	dim_month_key			int			NOT NULL
	PRIMARY KEY CLUSTERED (snz_employer_ird_uid,pent,dim_month_key));

INSERT INTO #ems_payer_pent_mth (
snz_employer_ird_uid,
pent,
dim_month_key)

SELECT DISTINCT
snz_employer_ird_uid
,pent
,dim_month_key
FROM #cm_ent_ind_emp_mth;

/* Get all of the unique employer id's for each month that have multiple PENTs associated with them */
DROP TABLE IF EXISTS #joint_ir_mth;
CREATE TABLE #joint_ir_mth(
snz_employer_ird_uid	int			NOT NULL,
dim_month_key			int			NOT NULL
PRIMARY KEY CLUSTERED (snz_employer_ird_uid,dim_month_key));

INSERT INTO #joint_ir_mth (
snz_employer_ird_uid,
dim_month_key)

SELECT
snz_employer_ird_uid
,dim_month_key
FROM #ems_payer_pent_mth
GROUP BY
snz_employer_ird_uid
,dim_month_key
HAVING MIN(pent) <> MAX(pent);

/* Get list of employers and employees for each month where individuals are employed by an employer that files for multiple PENTs */
DROP TABLE IF EXISTS #joint_ems_payer_emp_mth;
CREATE TABLE #joint_ems_payer_emp_mth (
	snz_employer_ird_uid	int				NOT NULL,
	snz_uid					int				NOT NULL,
	dim_month_key			int				NOT NULL
	PRIMARY KEY CLUSTERED (snz_employer_ird_uid,snz_uid,dim_month_key));

INSERT INTO #joint_ems_payer_emp_mth (
snz_employer_ird_uid,
snz_uid,
dim_month_key)

SELECT 
e.snz_employer_ird_uid
,snz_uid
,e.dim_month_key
FROM #cm_ent_ind_emp_mth e
JOIN #joint_ir_mth j
ON e.snz_employer_ird_uid = j.snz_employer_ird_uid
AND e.dim_month_key = j.dim_month_key;

/* Some individuals are paid directly by joint filer member (ie, not via a joint filing employer IR), restrict to subset of interest (joint filing employers and employees) */
DROP TABLE IF EXISTS #ems_joint_subset;
CREATE TABLE #ems_joint_subset (
	snz_employer_ird_uid	int				NOT NULL,
	snz_uid					int				NOT NULL,
	dim_month_key			int				NOT NULL,
	pent					char(10)		NOT NULL,
	min_pbn_nbr			    char(10)		NOT NULL,
	max_pbn_nbr			    char(10)		NOT NULL
    PRIMARY KEY CLUSTERED (snz_employer_ird_uid,pent,min_pbn_nbr,max_pbn_nbr,dim_month_key,snz_uid));

INSERT INTO #ems_joint_subset (
snz_employer_ird_uid,
snz_uid,
dim_month_key,
pent,
min_pbn_nbr,
max_pbn_nbr)

SELECT 
snz_employer_ird_uid
,e.snz_uid
,e.dim_month_key
,e.pent
,min_pbn_nbr
,max_pbn_nbr
FROM #cm_ent_ind_emp_mth e
JOIN (
        SELECT DISTINCT
        pent
		,p.dim_month_key
		FROM #ems_payer_pent_mth p
		JOIN #joint_ir_mth j
		on p.snz_employer_ird_uid = j.snz_employer_ird_uid
		and p.dim_month_key = j.dim_month_key
		) p	 /* Firm subject to joint filing */
ON e.pent = p.pent
AND e.dim_month_key = p.dim_month_key
JOIN (
       SELECT DISTINCT
       snz_uid
	   ,dim_month_key
		FROM #joint_ems_payer_emp_mth
		) j /* Employee subject to joint filing */
ON e.snz_uid = j.snz_uid
AND e.dim_month_key = j.dim_month_key;

DROP TABLE IF EXISTS #cm_ent_ind_emp_mth;
DROP TABLE IF EXISTS #ems_payer_pent_mth;
DROP TABLE IF EXISTS #joint_ems_payer_emp_mth;
 
/* Create joint_filed flag for all individual-enterprise combinations for each month where those people work for an employer who is subject to joint filing */
DROP TABLE IF EXISTS #joint_ems_pent_emp_mth;
CREATE TABLE #joint_ems_pent_emp_mth (
	pent					char(10)		NOT NULL,
	snz_uid					int				NOT NULL,
	dim_month_key			int				NOT NULL,
	snz_employer_ird_uid	int				NOT NULL,
	joint_filed		    	tinyint			NOT NULL
	PRIMARY KEY CLUSTERED (pent,snz_uid,dim_month_key,snz_employer_ird_uid));

INSERT INTO #joint_ems_pent_emp_mth (
pent,
snz_uid,
dim_month_key,
snz_employer_ird_uid,
joint_filed)

SELECT
e.pent
,e.snz_uid
,e.dim_month_key
,e.snz_employer_ird_uid
,joint_filed = CASE WHEN i.snz_employer_ird_uid is not null THEN 1 ELSE 0 END
FROM #ems_joint_subset e
LEFT JOIN #joint_ir_mth i 
ON e.snz_employer_ird_uid = i.snz_employer_ird_uid 
AND e.dim_month_key = i.dim_month_key;

DROP TABLE IF EXISTS #ems_joint_subset;
DROP TABLE IF EXISTS #joint_ir_mth;

/*-----------------------------------------------------------------------------------------------
Add Repeat Employer within 12 months indicator
-----------------------------------------------------------------------------------------------*/
/* Get observations thaht are the months for spell starts and/or spell ends */
DROP TABLE IF EXISTS #twelvemonths;

;WITH spells AS (
SELECT 
*
FROM #cm_job_spells
WHERE spell_start = 1 OR spell_end = 1
)

/* Get the next observed month of employment for the person and enterprise which will either by the spell end or the next spell start for that individual */
,nextmonth AS (
SELECT
*
,lead_nextmonth = LEAD(dim_month_key,1,0) OVER(PARTITION BY [uid_ent_index]
                ORDER BY [uid_ent_index], [dim_month_key]) 
FROM spells
)

/* Turn spell start and end from binary indicators into the corresponsing dim_month_key for the spell starts and ends */
,pivoted AS (
SELECT 
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,spell_start = CASE WHEN spell_start = 1 THEN dim_month_key ELSE null END
,spell_end = CASE WHEN (spell_start = 1 and spell_end = 1) THEN dim_month_key 
                 WHEN spell_start = 1 THEN lead_nextmonth ELSE null END
FROM nextmonth
WHERE CASE WHEN spell_start = 1 THEN dim_month_key ELSE null END is not null 
AND CASE WHEN (spell_start = 1 and spell_end = 1) THEN dim_month_key 
         WHEN spell_start = 1 THEN lead_nextmonth ELSE null END is not null
)

/* Get the next spell start and preceding spell end for person and enterprise */
,surroundspells AS (
SELECT 
*
,spell_start_date = CAST(CONCAT(LEFT(spell_start,4),'-',RIGHT(spell_start,2),'-01') AS DATE) 
,spell_end_date = CAST(CONCAT(LEFT(spell_end,4),'-',RIGHT(spell_end,2),'-01') AS DATE)
,lead_nextspell = LEAD(spell_start,1,0) OVER(PARTITION BY [uid_ent_index] ORDER BY [uid_ent_index], [spell_start]) 
,lag_lastspell = LAG(spell_end,1,0) OVER(PARTITION BY [uid_ent_index] ORDER BY [uid_ent_index], [spell_end]) 
FROM pivoted
)

/* Where there is a following spell, get the date a year from the end of the spell, when there is a previous spell, get the date a year before the start */
SELECT 
*
,lead_date = CASE WHEN lead_nextspell <> 0 THEN CAST(CONCAT(LEFT(lead_nextspell,4),'-',RIGHT(lead_nextspell,2),'-01') AS DATE) ELSE NULL END
,lag_date = CASE WHEN lag_lastspell <> 0 THEN CAST(CONCAT(LEFT(lag_lastspell,4),'-',RIGHT(lag_lastspell,2),'-01') AS DATE) ELSE NULL END
,end_plus12m = CASE WHEN lead_nextspell <> 0 THEN DATEADD(MONTH,12,spell_end_date) ELSE NULL END
,start_minus12m = CASE WHEN lag_lastspell <> 0 THEN DATEADD(MONTH,-12,spell_start_date) ELSE NULL END
INTO #twelvemonths
FROM surroundspells;

DROP TABLE IF EXISTS #repeat_emp12m;
CREATE TABLE #repeat_emp12m (
pent                char(10)    NOT NULL,
enterprise_nbr      char(10)    NOT NULL,
snz_uid             int         NOT NULL,
uid_ent_index       varchar(18) NOT NULL,
spell_start         int         NOT NULL,
spell_end           int         NOT NULL,
repeat_employer_12m	tinyint     NOT NULL);

INSERT INTO #repeat_emp12m (
pent,
enterprise_nbr,
snz_uid,
uid_ent_index,
spell_start,
spell_end,
repeat_employer_12m)

/* Determine if the person worked with the employer within 12 month of the spells start and spell end */
SELECT 
pent
,enterprise_nbr
,snz_uid
,uid_ent_index
,spell_start
,spell_end
,repeat_employer_12m = CASE WHEN lead_date <= end_plus12m or lag_date >= start_minus12m THEN 1 ELSE 0 END
FROM #twelvemonths;

DROP TABLE IF EXISTS #twelvemonths;

/*-----------------------------------------------------------------------------------------------
Add in one_month_gap_count variable
-----------------------------------------------------------------------------------------------*/
/* Determine next month for each observation */
DROP TABLE IF EXISTS #comparemonths;

;with monthplus1 as (
SELECT 
js.*
,spell_start_date = re.spell_start  
,spell_end_date = re.spell_end
,month_date = CAST(CONCAT(LEFT(dim_month_key,4),'-',RIGHT(dim_month_key,2),'-01') AS DATE)
,next_month = DATEADD(MONTH,1,CAST(CONCAT(LEFT(dim_month_key,4),'-',RIGHT(dim_month_key,2),'-01') AS DATE))
FROM #cm_job_spells js
LEFT JOIN #repeat_emp12m re 
ON js.uid_ent_index = re.uid_ent_index 
AND js.dim_month_key between re.spell_start and re.spell_end
)

/* Convert back to integer */
,nextmonth as (
SELECT
*,
next_month_key = YEAR(next_month)*100 + MONTH(next_month)
FROM monthplus1
)

/* Get the next observed month of employment for the person and enterprise */
,getnextmonth as (
SELECT
*
,lead_nextmonth = LEAD(dim_month_key,1,0) OVER(PARTITION BY [uid_ent_index]
                ORDER BY [uid_ent_index], [dim_month_key]) 
FROM nextmonth
)

/* If the chronological next month is equal to next observed month then no gap in employment */
SELECT
*
,case when next_month_key = lead_nextmonth then 0 else 1 end as one_month_gap
INTO #comparemonths
FROM getnextmonth
WHERE spell_end = 0; 

DROP TABLE IF EXISTS #one_month_gaps;
CREATE TABLE #one_month_gaps (
uid_ent_index  varchar(18)      NOT NULL,
spell_start_date    int         NOT NULL,
spell_end_date      int         NOT NULL,
one_month_gaps 	    int         NOT NULL);

INSERT INTO #one_month_gaps (
uid_ent_index,
spell_start_date,
spell_end_date,
one_month_gaps)

SELECT
uid_ent_index
,spell_start_date
,spell_end_date
,one_month_gaps = SUM(isnull(one_month_gap,0))
FROM #comparemonths
GROUP BY
uid_ent_index
,spell_start_date
,spell_end_date;

DROP TABLE IF EXISTS #comparemonths;

/*-------------------------------------------------------------------------------------------------
Employment spells with event information
-------------------------------------------------------------------------------------------------*/
/* Get sum of joint-filed months between start and end of employment spell */
DROP TABLE IF EXISTS #join_wp;

;WITH join_jf as (
SELECT 
re.snz_uid
,re.pent
,re.enterprise_nbr
,re.uid_ent_index
,re.spell_start
,re.spell_end
,re.repeat_employer_12m
,sum_joint_filed = SUM(isnull(joint_filed,0)) 
FROM #repeat_emp12m re
LEFT JOIN #joint_ems_pent_emp_mth jf 
ON re.pent = jf.pent 
AND re.snz_uid = jf.snz_uid 
AND jf.dim_month_key between re.spell_start and re.spell_end
GROUP BY
re.snz_uid
,re.pent
,re.enterprise_nbr
,re.uid_ent_index
,re.spell_start
,re.spell_end
,re.repeat_employer_12m
)

/* Make joint_filer flag if the employee-employer relationship had a joint-filing during spell and add in the count of one month gaps in employment spell */
,join_omg as (
SELECT 
js.snz_uid
,js.pent
,js.enterprise_nbr
,js.spell_start
,js.spell_end
,js.repeat_employer_12m
,joint_filer = CASE WHEN js.sum_joint_filed >= 1 THEN 1 ELSE 0 END
,one_month_gap_count = omg.one_month_gaps
FROM join_jf js
LEFT JOIN #one_month_gaps omg 
ON js.uid_ent_index = omg.uid_ent_index 
AND js.spell_start = omg.spell_start_date 
AND js.spell_end = omg.spell_end_date 
)

/* Join working propriertors table and count the number of reference tax years within employment spell */
SELECT 
es.snz_uid
,es.pent
,enterprise_nbr
,spell_start
,spell_end
,repeat_employer_12m
,joint_filer
,one_month_gap_count
,sum_wp = SUM(isnull(wp.year_count,0)) 
INTO #join_wp
FROM join_omg es
LEFT JOIN ( SELECT 
            * 
			,1 as year_count
			FROM #s3_pent_WP_yr_IDI
			WHERE dim_year_key is not null)
			wp 
ON es.pent = wp.pent 
AND es.snz_uid = wp.snz_uid 
AND wp.dim_year_key between spell_start and spell_end
GROUP BY
es.snz_uid
,es.pent
,enterprise_nbr
,spell_start
,spell_end
,repeat_employer_12m
,joint_filer
,one_month_gap_count;

/* If the person was a working proprietor of the pent at any time during Employment spell the indicator notes they are a working proprietor */
DROP TABLE IF EXISTS $(targetdb).$(targetschema).employment_spells;

CREATE TABLE $(targetdb).$(targetschema).employment_spells(
snz_uid             int         NOT NULL,
pent                char(10)    NOT NULL,
enterprise_nbr      char(10)    NOT NULL,
[data_source]       char(10)    NOT NULL,
spell_start         int         NOT NULL,
spell_end           int         NOT NULL,
working_proprietor  tinyint     NOT NULL,
repeat_employer_12m tinyint     NOT NULL,
joint_filer	        tinyint     NOT NULL,
one_month_gap_count tinyint     NULL
PRIMARY KEY (snz_uid,pent,enterprise_nbr,spell_start,spell_end));

INSERT INTO $(targetdb).$(targetschema).employment_spells(
snz_uid
,pent
,enterprise_nbr
,[data_source]
,spell_start
,spell_end
,working_proprietor
,repeat_employer_12m
,joint_filer
,one_month_gap_count)

SELECT 
snz_uid
,pent
,enterprise_nbr
,[data_source] ='ird_ems'
,spell_start
,spell_end
,working_proprietor = CASE WHEN sum_wp >= 1 THEN 1 ELSE 0 END
,repeat_employer_12m
,joint_filer
,one_month_gap_count
FROM #join_wp;

DROP TABLE IF EXISTS #repeat_emp12m;
DROP TABLE IF EXISTS #joint_ems_pent_emp_mth;
DROP TABLE IF EXISTS #one_month_gaps;
DROP TABLE IF EXISTS #join_wp;

DROP TABLE IF EXISTS  #s3_pent_bal_date_IDI; 
DROP TABLE IF EXISTS #s1_ent_pbn_repair_IDI; 
DROP TABLE IF EXISTS #s2_pent_IDI;