Thursday, March 28, 2024

DATABRICKS Pyspark Important

 ================DUPLICATE===============

# Define a Window specification

window_spec = Window.partitionBy(*columns_to_check).orderBy("SLS_HUB_INTRNL_ACCT_ID")


# Use rank() function to mark duplicates

df_final_prism_repartition_reject = df_final_prism_repartition.withColumn("is_duplicate", F.rank().over(window_spec) > 1).filter(col("is_duplicate") == "true")




===========READING FILE ===========


df = spark.read.format("csv")\

  .option("header",True)\

    .option("delimiter", "~")\

      .option("quote", "\"")\

        .option("encoding","ISO-8859-1")\

          .option("dateFormat","MM/dd/yyyy")\

            .schema(input_file_read_schema)\

              .load(work_file_path)

  

=================#Current CST Time===============

#Current CST Time

CST = pytz.timezone('US/Central')

Load_TMS =datetime.now().astimezone(CST).strftime('%Y-%m-%d %H:%M:%S')   

===========================================

input_file_read_schema = StructType([

    StructField("ACCOUNT_ID", StringType(), nullable=True),

    StructField("OW_ID", StringType(), nullable=True),

    StructField("HQ_ID", StringType(), nullable=True),

    StructField("JB_ID", StringType(), nullable=True)

])



===========WRITE FILE===========================

df_final_prism_repartition.write\

  .format("delta")\

  .mode("overwrite")\

  .save(struct_path)

====================================


BATCH_ID = datetime.now().astimezone(CST).strftime('%Y%m%d%H%M%S')

20240123021120

====================================


=============Clean unprintable characters from columns=====

'''df = df.withColumn("CART_ID", regexp_replace("CART_ID", r"[^\x01-\xFF]", ""))\

  .withColumn("CORP_NM", regexp_replace("CORP_NM", r"[^\x01-\xFF]", ""))\

  .withColumn("SLS_CONTRCT_AUTHRZ_NBR", regexp_replace("SLS_CONTRCT_AUTHRZ_NBR", r"[^\x01-\xFF]", ""))\

    .withColumn("CONTRCT_CATG_NM", regexp_replace("CONTRCT_CATG_NM", r"[^\x01-\xFF]", ""))

'''


===============Remove all space from all column in dataframe=======

for columnName in df.columns :

  df = df.withColumn(columnName, regexp_replace(col(columnName),"\\s+"," "))

  

======= is null otherwise===============================


df = df.withColumn("stgdate", when(trim(col("EDRA_ACTION_DATE")).isNotNull() == True, substring_index(trim(col("EDRA_ACTION_DATE")),' ',1)).otherwise(""))\

  .withColumn("stgtime", when(trim(col("EDRA_ACTION_DATE")).isNotNull() == True, substring_index(trim(col("EDRA_ACTION_DATE")),' ',-1)).otherwise(""))\

  .withColumn("stgdateValidty", when((col("stgdate") != '') & to_date(col("stgdate"),"MM/dd/yyyy").isNotNull(),lit(1)).otherwise(lit(0)))

==withColumnRenamed===================================


df = df.withColumn("OW_ID", trim(df.OW_ID)).withColumnRenamed("OW_ID","SLS_ACCT_ONEWRLD_ID")\

        .withColumn("HQ_ID", trim(df.HQ_ID)).withColumnRenamed("HQ_ID","SLS_ACCT_HDQ_ID")\

        .withColumn("PRISM_ID", when(trim(col("PRISM_ID")).isNotNull() == True, trim(col("PRISM_ID"))).otherwise("")).withColumnRenamed("PRISM_ID","CORP_ID")\

        .withColumn("COMMON_CORP_ID", when(trim(col("COMMON_CORP_ID")).isNotNull() == True, trim(col("COMMON_CORP_ID"))).otherwise("")).withColumnRenamed("COMMON_CORP_ID","COMMON_CORP_ACCT_ID")\

        .withColumn("ACCOUNT_ID", trim(df.ACCOUNT_ID)).withColumnRenamed("ACCOUNT_ID","SLS_HUB_INTRNL_ACCT_ID")\

        .withColumn("EDRA_REQUEST_TYPE", trim(df.EDRA_REQUEST_TYPE)).withColumnRenamed("EDRA_REQUEST_TYPE","EDRA_RQST_TYPE_TXT")\

        .withColumn("JB_ID", trim(df.JB_ID)).withColumnRenamed("JB_ID","JB_CORP_ID")\

        .withColumn("ROW_EFF_DT", current_date())\

        .withColumn("ROW_EXP_DT", lit('9999-12-31'))\

        .withColumn("ROW_STATUS_IND", lit('I'))\

 ===================widgets ===================== 

 

dbutils.widgets.text("adb_par", "","")

dbutils.widgets.text('env', "","")


PipelineName = dbutils.widgets.get('PipelineName')

env = dbutils.widgets.get('env')



====call other notebook============================

%run "../../config/corpd_parameterSetup"



============Run a query and get=========

status_chk = f"""select count(*) as v_not_empty from corpd_{env}_struct.corpd_job_status where STATUS = 'R' and JOB_NAME = '{PipelineName}'"""

print(status_chk)

last_status_chk = spark.sql(status_chk).collect()[0][0]


=========HOW TO GET NOTEBOOK NAME===========================

notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().getOrElse(None)


# Extract the notebook name from the path

notebook_name = notebook_path.split("/")[-1]

print(notebook_name)


================================================

=======================TABLE to load in VIEW==================

##Mosaic JDBC info###

driver = 'com.teradata.jdbc.TeraDriver'

jdbcURL = f"jdbc:teradata://{mosaic_cpd_server}"


sql_query_text = f"""Select * from cert_MOSAIC_DB.PRISM_CONTRCT_INFO"""


df = spark.read \

      .format('jdbc') \

      .option('driver', driver) \

      .option('url', jdbcURL) \

      .option('query', sql_query_text) \

      .option('user', mosaic_cpd_user) \

      .option('password', mosaic_cpd_passwd)\

      .load()


df = df.drop_duplicates()


print(df.count())

view_name = "PRISM_CONTRCT_INFO"

print(view_name)

df.createOrReplaceTempView(view_name)

============================================================

==================dropDuplicate()===========================


df_accept = df_accept.dropDuplicates(['AccountAnalystName',

                                      'AccountMgrEmployeeID',

                                      'AccountMgrName',

                                      'AccountType',

                                      'Website',

                                      'TaxNumber'])



===========================Count in all coulms wise=================================   

# Find Count of Null, None, NaN of All DataFrame Columns

from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(col(CONTRCT_ID).isNull(), 

    CONTRCT_ID)).alias(CONTRCT_ID) for CONTRCT_ID in df.columns]).show()


df.select([count(col(CORP_ID)).alias(CORP_ID) for CORP_ID in df.columns]).show()



=====================String to date conversion=========


df=df.withColumn("CONTRCT_EFF_DT",to_date(to_date("CONTRCT_EFF_DT","MM/dd/yyyy"),'yyyy-MM-dd'))

df=df.withColumn("CONTRCT_EXP_DT",to_date(to_date("CONTRCT_EXP_DT","MM/dd/yyyy"),'yyyy-MM-dd'))

display(df)



======================================================


AZURE AAD ServicePrincipalCredentials and ClientSecretCredential classes

 Both ServicePrincipalCredentials and ClientSecretCredential are classes in the azure-identity module of the Azure SDK for Python, 

which allow authenticating with Azure Active Directory (AAD) using various methods.

Here are the main differences between these two classes:


Authentication Method:-

ServicePrincipalCredentials supports several ways to authenticate with AAD, including certificate-based authentication,

interactive login via a web browser, and password-based authentication using a service principal. On the other hand, ClientSecretCredential only

supports password-based authentication using a service principal.


Constructor Parameters:-

To construct a ServicePrincipalCredentials object, you need to specify several pieces of information, including the tenant ID, client ID, 

and either a certificate file path or a username and password. By contrast, ClientSecretCredential only requires three constructor arguments: tenant ID, client ID, 

and client secret.



print("Setting ServicePrincipalCredentials")

from azure.common.credentials import ServicePrincipalCredentials

spCredentials = ServicePrincipalCredentials(client_id=service_principle_clientid, secret=spSecret, tenant=service_principle_directory_id)


or 

 from azure.identity import ClientSecretCredential

  print("Setting spCredentials using ClientSecretCredential")

 spCredentials = ClientSecretCredential(tenant_id=service_principle_directory_id, client_id=service_principle_clientid, client_secret=spSecret)


-


Tuesday, January 28, 2020

Model Repository Overview

Model Repository Overview
The Model repository is a relational database that stores the metadata for projects and folders.
Connect to the Model repository to create and edit physical data objects, mapping, profiles, and other objects. Include objects in an application, and then deploy the application to make the objects available for access by end users and third-party tools.
The following image shows an open Model repository named mrs1 in the Object Explorer view:






The Model Repository Service manages the Model repository. All client applications and application services that access the Model repository connect through the Model Repository Service. Client applications include the Developer tool and the Analyst tool. Informatica services that access the Model repository include the Model Repository Service, the Analyst Service, and the Data Integration Service.
When you set up the Developer tool, you must add a Model repository. Each time you open the Developer tool, you connect to the Model repository to access projects and folders.
When you edit an object, the Model repository locks the object for your exclusive editing. You can also integrate the Model repository with a third-party version control system. With version control system integration, you can check objects out and in, undo the checkout of objects, and view and retrieve historical versions of objects.

Informatica Data Quality and Profiling

Use the data quality capabilities in the Developer tool to analyze the content and structure of your data and enhance the data in ways that meet your business needs.
Use the Developer tool to design and run processes to complete the following tasks:
  • Profile data. Profiling reveals the content and structure of data. Profiling is a key step in any data project as it can identify strengths and weaknesses in data and help you define a project plan.
  • Create scorecards to review data quality. A scorecard is a graphical representation of the quality measurements in a profile.
  • Standardize data values. Standardize data to remove errors and inconsistencies that you find when you run a profile. You can standardize variations in punctuation, formatting, and spelling. For example, you can ensure that the city, state, and ZIP code values are consistent.
  • Parse data. Parsing reads a field composed of multiple values and creates a field for each value according to the type of information it contains. Parsing can also add information to records. For example, you can define a parsing operation to add units of measurement to product data.
  • Validate postal addresses. Address validation evaluates and enhances the accuracy and deliverability of postal address data. Address validation corrects errors in addresses and completes partial addresses by comparing address records against address reference data from national postal carriers. Address validation can also add postal information that speeds mail delivery and reduces mail costs.
  • Find duplicate records. Duplicate analysis calculates the degrees of similarity between records by comparing data from one or more fields in each record. You select the fields to be analyzed, and you select the comparison strategies to apply to the data. The Developer tool enables two types of duplicate analysis: field matching, which identifies similar or duplicate records, and identity matching, which identifies similar or duplicate identities in record data.
  • Manage exceptions. An exception is a record that contains data quality issues that you correct by hand. You can run a mapping to capture any exception record that remains in a data set after you run other data quality processes. You review and edit exception records in the Analyst tool.
  • Create reference data tables. Informatica provides reference data that can enhance several types of data quality process, including standardization and parsing. You can create reference tables using data from profile results.
  • Create and run data quality rules. Informatica provides rules that you can run or edit to meet your project objectives. You can create mapplets and validate them as rules in the Developer tool.
  • Collaborate with Informatica users. The Model repository stores reference data and rules, and this repository is available to users of the Developer tool and Analyst tool. Users can collaborate on projects, and different users can take ownership of objects at different stages of a project.
  • Export mappings to PowerCenter®. You can export and run mappings in PowerCenter. You can export mappings to PowerCenter to reuse the metadata for physical data integration or to create web services.

Informatica Nodes/Core Services/Gateway Nodes

Informatica Nodes/Core Services/Gateway Nodes

Service Manager:
The Service Manager is a service that manages all domain operations. It runs within Informatica Services. It runs as a service on Windows . When you start Informatica Services, you start the Service Manager. The Service Manager runs on each node. If the Service Manager is not running, the node is not available.
The Service Manager runs on all nodes in the domain to support the application services and the domain:
Application Services: Application services represent PowerCenter server-based functionality. Here are some application services in Informatica Power Center.
  • Integration Service
  • Repository Service
  • Reporting Service
  • Metadata Manager Service
  • SAP BW Service
  • Web Services Hub
  • Reference Table Manager Service

Required Services and DB Schemas for 10.1 IDQ Configuration



  • Required Services are:
    • Model repository service (MRS)
    • Data Integration Service
    • Analyst Service
    • Content Management Service
    • Data Director Service

    Required database Schemas are :
    • Database schema for Model repository (for MRS)
    • Database schema for data profiling  (for Analyst service)
IDQ Application Services and Schemas:

List of application services part of the Data Quality Standard Edition:
  1. AnalysServicee
  2. Content Management Service
  3. Data Integration Service
  4. Model Repository Service
  5. Search Service

Set up the following databases:
  1. Model repository for the Model Repository Service.
  2. Data object cache database to cache logical data objects and virtual tables.
  3. Profiling warehouse to perform data profiling and discovery.
  4. Workflow database to store run-time metadata for workflows.
  5. Reference data warehouse to store reference table data for the Content Management Service.


Tuesday, October 17, 2017

TERA DATA PART1

--------------------------------------------------------------------------
Components of Teradata
--------------------------------------------------------------------------
Teradata is made up of following components –
Processor Chip – The processor is the BRAIN of the Teradata system. It is responsible for all the processing done by the system. All task are done according to the direction of the processor.
Memory – The memory is known as the HAND of the Teradata system. Data is retrieved from the hard drives into memory, where processor manipulates, change or alter the data. Once changes are made in memory, the processor directs the information back to the hard drive for storage.
Hard Drives – This is known as the SPINE of the Teradata system. All the data of the Teradata system is stored in the hard drives. Size of hard drives reflects the size of the Teradata system.

Teradata has Linear Scalability
One of the most important asset of Teradata is that it has Linear Scalability. There is no limit on Teradata system. We can grow it to as many times as we want. Any time you want to double the speed of Teradata system, just double the numbers of AMPs and PE. This can be better explained with the help of an example –
Teradata takes every table in the system and spread evenly among different AMPs. Each Amp works on the portion of records which it holds.
Suppose a EMPLOYEE table has 8 different employee id’s. Now in a 2 AMP system each AMP will hold 4 rows in its DISK to accommodate total 8 rows.

At the time of data retrieval each AMP will work on its DISK and send 4 rows to PE for further processing. If we suppose, one AMP will take 1 microseconds (MS) to retrieve 1 rows, then the time taken to retrieve 4 rows is 4 MS. And as we know that AMPs work in parallel, so both the AMPs will retrieve all 8 records in 4 MS only (4 MS time for each AMP).
Now we double the AMP in our system, and we use total 4 AMP. As Teradata distribute the records evenly among all AMPs, so now each AMP will store 2 records of the table.

--------------------------------------------------------------------------
SET or MULTISET tables:-
--------------------------------------------------------------------------
SET tables – SET tables did not allow duplicate values in the table.
MULTISET tables – MULTISET tables allow duplicate values in table.

CREATE VOLATILE TABLE temp
(
ID INT,
Name VARCHAR(20)
)
PRIMARY INDEX(ID)
ON COMMIT PRESERVE ROWS;


or
CREATE VOLATILE TABLE dept_stat (
   dept_no INTEGER,
   avg_salary INTEGER,
   max_salary INTEGER,
   min_salary INTEGER
)
PRIMARY INDEX(dept_no)
ON COMMIT PRESERVE ROWS;

show test;

select * from  temp;

insert into test values(1,'DINESH');
insert into test values(2,'LUCK');

---

create  index (id) on test1

create  unique index (id) on test1

HELP STATISTICS test1
-----------------------------------------------------------
*EXPLAIN command returns the execution plan of parsing engine in English*

EXPLAIN select * from test1
show table test

-----------------------------------------------------------------------------
Built-in Functions in Teradata
-----------------------------------------------------------------------------

select ACCOUNT;
select CURRENT_DATE;
select CURRENT_TIME;
select CURRENT_TIMESTAMP;
select DATABASE;
select date;
select session;
select time;
select user;


-----------------------------------------------------------------------------
How to get Case Sensitivity in Teradata
-----------------------------------------------------------------------------
Teradata by default is case insensitive. Whatever value you compare in the WHERE clause it does not match the exact case of that value.
So what if there is a scenario when you want to compare the value along with its specific case in the WHERE clause. Here come the CASESPECIFIC keyword for the rescue.Just by adding this keyword after the value in WHERE clause we can force the Teradata to impose case sensitivity on the checking condition

select current_date where upper('teradata')='teradata'(CASESPECIFIC);---not return any date.

select current_date where lower('teradata')='teradata'(CASESPECIFIC);---return date.

SELECT emp_name FROM emp_table WHERE emp_name (CASESPECIFIC) LIKE ‘John%’ —— This query will return all name starting with capital J only
So just by using CASESPECIFIC we can force Teradata to check for case sensitivity
--------------------------------------
Primary Index in Teradata-
--------------------------------------
Each table in Teradata is required to have a primary index.Even if you did not define any primary index in CREATE table statement, the Teradata system will automatically create the primary index based on the setting of DBScontrol setting field . The primary index defines where data will reside and which AMP receives the row.
The three most important roles the primary index does is the following –
Data Distribution
Fastest way to retrieve Data
Incredibly important for Joins
In short primary index provides the fastest physical path to retrieving dazta.

2 type:-
A unique primary index means that the value for the selected column must be unique.
A Non unique primary index means that the value for the selected column can be non unique.

The primary index is the key to determine where the ROW of the table will reside on which AMP. When a new row arrive for insert in Teradata the following steps occur –
Teradata Parsing Engine (PE) examines the primary index of the row.
Teradata takes the primary index of the rows and run it through HASHING ALGORITHM.
The output of the Hashing Algorithm is the 32 bit Row – Hash value.

-----------------------------------------------------------------------------
Secondary Index in Teradata:--
-----------------------------------------------------------------------------
Secondary Indexes provide an alternate path to the data, and should be used on queries that run many times.

Syntax of creating Secondary Index
Syntax of UNIQUE SI:
CREATE UNIQUE INDEX (Column/Columns) ON .;
Syntax of NON-UNIQUE SI:
CREATE INDEX (Column/Columns) ON .;

Whenever you create SI on the table, Teradata will create a subtable on all AMP. This subtable contains three columns given below –
Secondary Index Value
Secondary Index Row ID (this is the hashed value of SI value)
Base table Row ID (this is the actual base row id  )


-----------------------------------------------------------------------------
Recursive Query in Teradata
-----------------------------------------------------------------------------
Using Recursive query data with in the single table can be scanned multiple times using a single SQL statement.
A recursive query is useful when relationships between data elements  exist in multiple iterations. Recursive query consist of two parts a seeding query and a recursive query.
Seeding query provides the initial records for which data needs to be iterated whereas Recursive query part helps in data iteration.


Recursive query syntax :

WITH RECURSIVE < recursive_tablename >
(,,….)
AS
(

UNION ALL

)


1) Recursive_tablename is a derived recursive table defined using WITH RECURSIVE keyword.
2) Column names of recursive table is specified as . This column names will be finally displayed in the output.
3) Seeding query ( query which is used as the input for recursive logic ) is mentioned as the first select statement.
4) Second select statement is known as the recursive query and is joined with seed table for multiple iterations.
5) Third select statement finally gives the output of recursive logic.

CREATE VOLATILE TABLE  Employee
(
EMP_ID INTEGER,
MGR_ID INTEGER,
EMP_NAME VARCHAR(100)
)
ON COMMIT PRESERVE ROWS;

-----------------------------------------------------------------------------------------------------------------------
ROLLUP Function:

-----------------------------------------------------------------------------------------------------------------------
ROLLUP is used to aggregate data along all the levels of hierarchy within a single dimension.
SELECT PRODUCT_ID
,SUM(QUANTITY)  AS TOTAL_QUANTITY
FROM STORE_QTY
GROUP BY ROLLUP  (PRODUCT_ID)
ORDER BY 1;
 roll2
Here along with all Product_Id you get a row which has ? in PRODUCT_ID column.
This ? Isn’t null but instead indicates the grand total of all the product_id.

Top SQL Commands
--------------------------------
--------------------------------

SQL to changing the default Database
Profile Keywords

DATABASE EMP_DATA_BASE;

SQL to find Information about a Database
HELP DATABASE EMP_DATA_BASE;

SQL to get Sample number of rows      
SELECT * FROM EMP_TBL SAMPLE 10;

SQL to get a sample Percentage of rows
SELECT * FROM EMP_TBL SAMPLE .50;

SQL to find information about a Table
SHOW TABLE EMP_TBL;

SQL to Use an Access Locked Table
LOCKING ROW FOR ACCESS SELECT * FROM EMP_TBL;

SQL Keywords that describe you
SELECT DATABASE, USER, SESSION,
ACCOUNT,
PROFILE,
ROLE;"
"SELECT DATE,
CURRENT_DATE,
TIME,
CURRENT_TIME,
CURRENT_TIMESTAMP;

SQL to Use Aggregates functions
SELECT TOP 10  STUDENT_NO, FIRST_NAME, LAST_NAME, CLASS_CODE
FROM STUDENT_TBL
ORDER BY GRADE DESC;"
"SELECT DEPT_NO
,MAX(SALARY) AS ""MAXIMUM""
,MIN(SALARY) AS ""MINIMUM""
,AVG(SALARY) AS ""AVERAGE""
,SUM(SALARY) AS ""SUM""
,COUNT(*) AS ""COUNT""
FROM EMP_TBL
GROUP BY DEPT_NO
ORDER BY DEPT_NO;

SQL to Select TOP Rows in a Rank Order
SELECT TOP 10  STUDENT_NO
,FIRST_NAME
,LAST_NAME
,CLASS_CODE
FROM STUDENT_TBL
ORDER BY GRADE DESC;

SQL Using Date, Time and Timestamp
SELECT
CALENDAR_DATE                
,DAY_OF_WEEK                  
,DAY_OF_MONTH                
,DAY_OF_YEAR                  
,DAY_OF_CALENDAR              
,WEEKDAY_OF_MONTH            
,WEEK_OF_MONTH                
,WEEK_OF_YEAR                
,WEEK_OF_CALENDAR            
,MONTH_OF_QUARTER            
,MONTH_OF_YEAR                
,MONTH_OF_CALENDAR            
,QUARTER_OF_YEAR              
,QUARTER_OF_CALENDAR          
,YEAR_OF_CALENDAR            
FROM SYS_CALENDAR.CALENDAR;


SQL to Find out how much Space a USER have
SELECT
USERNAME                    
,CREATORNAME                  
,PERMSPACE                    
,SPOOLSPACE                  
,TEMPSPACE                    
,LASTALTERNAME                
,LASTALTERTIMESTAMP          
FROM DBC.USERS
WHERE USERNAME='USER';

SQL to find how much Space left Per AMP in database
SELECT
VPROC                        
,DATABASENAME                
,ACCOUNTNAME                  
,MAXPERM
,MAXSPOOL                    
,MAXTEMP                      
 FROM DBC.DISKSPACE
WHERE DATABASENAME='EMP_DB'   ;        

SQL to finding USER Space
SELECT
 MAX(MAXPERM)
,MAX(MAXSPOOL)
,MAX(MAXTEMP)
 FROM DBC.DISKSPACE
WHERE DATABASENAME='USER' ;

SQL to find Space Skew in Tables in a Database
SELECT VPROC
,CAST(TABLENAME AS CHAR(20))
,CURRENTPERM
,PEAKPERM
FROM DBC.TABLESIZEV
WHERE DATABASENAME='USER'
ORDER BY TABLENAME, VPROC;

SQL to Find Table Skew
SELECT
TABLENAME,
SUM(CURRENTPERM) /(1024*1024) AS CURRENTPERM,
(100 - (AVG(CURRENTPERM)/MAX(CURRENTPERM)*100)) AS SKEWFACTOR
FROM
DBC.TABLESIZE
WHERE DATABASENAME=
AND TABLENAME =
GROUP BY 1;

SQL to Find AMP Skew
SELECT DATABASENAME
,TABLENAME
,VPROC
,CURRENTPERM
,PEAKPERM
FROM DBC.TABLESIZE
WHERE
DATABASENAME=
AND
TABLENAME=
ORDER BY VPROC ;

SQL to find number of rows per AMP for a Column
SELECT HASHAMP(HASHBUCKET( HASHROW(EMP_NO))) AS ""AMP"" , COUNT(*)
FROM EMP_TABLE
GROUP BY 1
ORDER BY 1;

SQL to Identify  duplicate records
SELECT COLUMN1, COLUMN2, COLUMN3, COUNT(*)
FROM
DATABASE.TABLE
GROUP BY COLUMN1, COLUMN2, COLUMN3
HAVING COUNT(*) >1;

SQL to Delete Duplicate records
CREATE TABLE TABLE1_BACKUP AS (SELECT * FROM TABLE1 QUALIFY ROW_NUMBER() OVER (PARTITION BY COLUMN1 ORDER BY COLUMN1 DESC )=1) WITH DATA;
DELETE FROM TABLE1;
INSERT INTO TABLE1 SELECT * FROM TABLE_BACKUP;

SQL below to find TOP Databases by space occupied
SELECT
DatabaseName
,MAX(CurrentPerm) * (HASHAMP()+1)/1024/1024 AS USEDSPACE_IN_MB
FROM DBC.DiskSpace
GROUP BY DatabaseName
ORDER BY USEDSPACE_IN_MB DESC;

SQL to find TOP Tables by space occupied
SELECT DATABASENAME
,TABLENAME
,SUM(CurrentPerm)/1024/1024 AS TABLESIZE_IN_MB
FROM DBC.TableSize
GROUP BY DATABASENAME,TABLENAME
ORDER BY TABLESIZE_IN_MB DESC;

SQL to find out list of nodes
SELECT DISTINCT NODEID FROM DBC.RESUSAGESPMA;

SQL to find Account Information
SELECT * FROM DBC.AccountInfoV ORDER BY 1;

Friday, September 22, 2017

Performance Tuning the Siebel Change Capture Process in DAC

Performance Tuning the Siebel Change Capture Process in DAC

DAC performs the change capture process for Siebel source systems. This process has two components:

1)The change capture process occurs before any task in an ETL process runs.

2)The change capture sync process occurs after all of the tasks in an ETL process have completed successfully.

Supporting Source Tables:-

The source tables that support the change capture process are as follows:

     S_ETL_I_IMG. Used to store the primary key (along with MODIFICATION_NUM and LAST_UPD) of the records that were either created or modified since the time of the last ETL.

     S_ETL_R_IMG. Used to store the primary key (along with MODIFICATION_NUM and LAST_UPD) of the records that were loaded into the data warehouse for the prune time period.

     S_ETL_D_IMG. Used to store the primary key records that are deleted on the source transactional system.

Full and Incremental Change Capture Processes:-

The full change capture process (for a first full load) does the following:

    Inserts records into the S_ETL_R_IMG table, which has been created or modified for the prune time period.

   Creates a view on the base table. For example, a view V_CONTACT would be created for the base table S_CONTACT.

The incremental change capture process (for subsequent incremental loads) does the following:

   Queries for all records that have changed in the transactional tables since the last ETL date, filters them against the records from the R_IMG table, and inserts them into the S_ETL_I_IMG table.

   Queries for all records that have been deleted from the S_ETL_D_IMG table and inserts them into the S_ETL_I_IMG table.

Removes the duplicates in the S_ETL_I_IMG table. This is essential for all the databases where "dirty reads" (queries returning uncommitted data from all transactions) are allowed.

Creates a view that joins the base table with the corresponding S_ETL_I_IMG table.


Performance Tips for Siebel Sources:-

Performance Tip: Reduce Prune Time Period:-

Reducing the prune time period (in the Connectivity Parameters subtab of the Execution Plans tab) can improve performance, because with a lower prune time period, the S_ETL_R_IMG table will contain a fewer

number of rows. The default prune time period is 2 days. You can reduce it to a minimum of 1 day.

Note: If your organization has mobile users, when setting the prune time period, you must consider the lag time that may exist between the timestamp of the transactional system and the mobile users' local timestamp. You

should interview your business users to determine the potential lag time, and then set the prune time period accordingly.


Performance Tip: Eliminate S_ETL_R_IMG From the Change Capture Process

If your Siebel implementation does not have any mobile users (which can cause inaccuracies in the values of the "LAST_UPD" attribute), you can simplify the change capture process by doing the following:

Removing the S_ETL_R_IMG table.

Using the LAST_REFRESH_DATE rather than PRUNED_LAST_REFRESH_DATE.

To override the default DAC behavior, add the following SQL to the customsql.xml file before the last line in the file, which reads as
. The customsql.xml file is located in the dac\CustomSQLs directory.


TRUNCATE TABLE S_ETL_I_IMG_%SUFFIX
;
TRUNCATE TABLE S_ETL_D_IMG_%SUFFIX
;



TRUNCATE TABLE S_ETL_I_IMG_%SUFFIX
;
INSERT %APPEND INTO S_ETL_I_IMG_%SUFFIX
     (ROW_ID, MODIFICATION_NUM, OPERATION, LAST_UPD)
     SELECT
          ROW_ID
          ,MODIFICATION_NUM
          ,'I'
          ,LAST_UPD
     FROM
          %SRC_TABLE
     WHERE
          %SRC_TABLE.LAST_UPD > %LAST_REFRESH_TIME
          %FILTER
INSERT %APPEND INTO S_ETL_I_IMG_%SUFFIX
     (ROW_ID, MODIFICATION_NUM, OPERATION, LAST_UPD)
     SELECT
          ROW_ID
          ,MODIFICATION_NUM
          ,'D'
          ,LAST_UPD
     FROM
          S_ETL_D_IMG_%SUFFIX
     WHERE NOT EXISTS
     (
          SELECT
                 'X'
          FROM
                 S_ETL_I_IMG_%SUFFIX
          WHERE
                 S_ETL_I_IMG_%SUFFIX.ROW_ID = S_ETL_D_IMG_%SUFFIX.ROW_ID
          )
;



DELETE
FROM S_ETL_D_IMG_%SUFFIX
WHERE
     EXISTS
     (SELECT
                 'X'
     FROM
                 S_ETL_I_IMG_%SUFFIX
     WHERE
                 S_ETL_D_IMG_%SUFFIX .ROW_ID = S_ETL_I_IMG_%SUFFIX.ROW_ID
                 AND S_ETL_I_IMG_%SUFFIX.OPERATION = 'D'
     )
;




Performance Tip: Omit the Process to Eliminate Duplicate Records :-

When the Siebel change capture process runs on live transactional systems, it can run into deadlock issues when DAC queries for the records that changed since the last ETL process. To alleviate this problem, you need to

enable "dirty reads" on the machine where the ETL is run. If the transactional system is on a database that requires "dirty reads" for change capture, such as MSSQL, DB2, or DB2-390, it is possible that the record

identifiers columns (ROW_WID) inserted in the S_ETL_I_IMG table may have duplicates. Before starting the ETL process, DAC eliminates such duplicate records so that only the record with the smallest

MODIFICATION_NUM is kept. The SQL used by DAC is as follows:


SELECT
     ROW_ID, LAST_UPD, MODIFICATION_NUM
FROM
     S_ETL_I_IMG_%SUFFIX A
WHERE EXISTS
     (
     SELECT B.ROW_ID, COUNT(*)  FROM S_ETL_I_IMG_%SUFFIX B
               WHERE B.ROW_ID = A.ROW_ID
                 AND B.OPERATION = 'I'
                 AND A.OPERATION = 'I'
     GROUP BY
         B.ROW_ID
     HAVING COUNT(*) > 1
     )
AND A.OPERATION = 'I'
ORDER BY 1,2

However, for situations where deadlocks and "dirty reads" are not an issue, you can omit the process that detects the duplicate records by using the following SQL block. Copy the SQL block into the customsql.xml file

before the last line in the file, which reads as
. The customsql.xml file is located in the dac\CustomSQLs directory.

SELECT
     ROW_ID, LAST_UPD, MODIFICATION_NUM
FROM
     S_ETL_I_IMG_%SUFFIX A
WHERE 1=2

Performance Tip: Omit the Process to Eliminate Duplicate Records
When the Siebel change capture process runs on live transactional systems, it can run into deadlock issues when DAC queries for the records that changed since the last ETL process. To alleviate this problem, you need to

enable "dirty reads" on the machine where the ETL is run. If the transactional system is on a database that requires "dirty reads" for change capture, such as MSSQL, DB2, or DB2-390, it is possible that the record

identifiers columns (ROW_WID) inserted in the S_ETL_I_IMG table may have duplicates. Before starting the ETL process, DAC eliminates such duplicate records so that only the record with the smallest

MODIFICATION_NUM is kept. The SQL used by DAC is as follows:


SELECT
     ROW_ID, LAST_UPD, MODIFICATION_NUM
FROM
     S_ETL_I_IMG_%SUFFIX A
WHERE EXISTS
     (
     SELECT B.ROW_ID, COUNT(*)  FROM S_ETL_I_IMG_%SUFFIX B
               WHERE B.ROW_ID = A.ROW_ID
                 AND B.OPERATION = 'I'
                 AND A.OPERATION = 'I'
     GROUP BY
         B.ROW_ID
     HAVING COUNT(*) > 1
     )
AND A.OPERATION = 'I'
ORDER BY 1,2

However, for situations where deadlocks and "dirty reads" are not an issue, you can omit the process that detects the duplicate records by using the following SQL block. Copy the SQL block into the customsql.xml file

before the last line in the file, which reads as
. The customsql.xml file is located in the dac\CustomSQLs directory.

SELECT
     ROW_ID, LAST_UPD, MODIFICATION_NUM
FROM
     S_ETL_I_IMG_%SUFFIX A
WHERE 1=2


Performance Tip: Manage Change Capture Views

DAC drops and creates the incremental views for every ETL process. This is done because DAC anticipates that the transactional system may add new columns on tables to track new attributes in the data warehouse. If you

do not anticipate such changes in the production environment, you can set the DAC system property "Drop and Create Change Capture Views Always" to "false" so that DAC will not drop and create incremental views. On

DB2 and DB2-390 databases, dropping and creating views can cause deadlock issues on the system catalog tables. Therefore, if your transactional database type is DB2 or DB2-390, you may want to consider setting the

DAC system property "Drop and Create Change Capture Views Always" to "false." For other database types, this action may not enhance performance.

Note: If new columns are added to the transactional system and the ETL process is modified to extract data from those columns, and if views are not dropped and created, you will not see the new column definitions in the

view, and the ETL process will fail.

Performance Tip: Determine Whether Informatica Filters on Additional Attributes

DAC populates the S_ETL_I_IMG tables by querying only for data that changed since the last ETL process. This may cause all of the records that were created or updated since the last refresh time to be extracted.

However, the extract processes in Informatica may be filtering on additional attributes. Therefore, for long-running change capture tasks, you should inspect the Informatica mapping to see if it has additional WHERE

clauses not present in the DAC change capture process. You can modify the DAC change capture process by adding a filter clause for a . combination in the ChangeCaptureFilter.xml file, which 


is located in the dac\CustomSQLs directory.

SQL for Change Capture and Change Capture Sync Processes

The SQL blocks used for the change capture and change capture sync processes are as follows:


TRUNCATE TABLE S_ETL_I_IMG_%SUFFIX
;
TRUNCATE TABLE S_ETL_R_IMG_%SUFFIX
;
TRUNCATE TABLE S_ETL_D_IMG_%SUFFIX
;
INSERT %APPEND INTO S_ETL_R_IMG_%SUFFIX
     (ROW_ID, MODIFICATION_NUM, LAST_UPD)
     SELECT
          ROW_ID
          ,MODIFICATION_NUM
          ,LAST_UPD
     FROM
          %SRC_TABLE
     WHERE
          LAST_UPD > %PRUNED_ETL_START_TIME
          %FILTER
;



TRUNCATE TABLE S_ETL_I_IMG_%SUFFIX
;
INSERT %APPEND INTO S_ETL_I_IMG_%SUFFIX
     (ROW_ID, MODIFICATION_NUM, OPERATION, LAST_UPD)
     SELECT
          ROW_ID
          ,MODIFICATION_NUM
          ,'I'
          ,LAST_UPD
     FROM
          %SRC_TABLE
     WHERE
          %SRC_TABLE.LAST_UPD > %PRUNED_LAST_REFRESH_TIME
          %FILTER
          AND NOT EXISTS
          (
          SELECT
                  ROW_ID
                  ,MODIFICATION_NUM
                  ,'I'
                  ,LAST_UPD
          FROM
                  S_ETL_R_IMG_%SUFFIX
          WHERE
                  S_ETL_R_IMG_%SUFFIX.ROW_ID = %SRC_TABLE.ROW_ID
                  AND S_ETL_R_IMG_%SUFFIX.MODIFICATION_NUM = %
SRC_TABLE.MODIFICATION_NUM
                  AND S_ETL_R_IMG_%SUFFIX.LAST_UPD = %
SRC_TABLE.LAST_UPD
                )
;
INSERT %APPEND INTO S_ETL_I_IMG_%SUFFIX
          (ROW_ID, MODIFICATION_NUM, OPERATION, LAST_UPD)
          SELECT
                  ROW_ID
                  ,MODIFICATION_NUM
                  ,'D'
                  ,LAST_UPD
          FROM
                  S_ETL_D_IMG_%SUFFIX
          WHERE NOT EXISTS
          (
                  SELECT
                           'X'
                  FROM
                           S_ETL_I_IMG_%SUFFIX
                  WHERE
                           S_ETL_I_IMG_%SUFFIX.ROW_ID = S_ETL_D_IMG_%
SUFFIX.ROW_ID
          )
;



DELETE
FROM S_ETL_D_IMG_%SUFFIX
WHERE
          EXISTS
          (
          SELECT
                    'X'
          FROM
                    S_ETL_I_IMG_%SUFFIX
          WHERE
                    S_ETL_D_IMG_%SUFFIX.ROW_ID = S_ETL_I_IMG_%SUFFIX.ROW_ID
                    AND S_ETL_I_IMG_%SUFFIX.OPERATION = 'D'
          )
;
DELETE
FROM S_ETL_I_IMG_%SUFFIX
WHERE LAST_UPD < %PRUNED_ETL_START_TIME
;
DELETE
FROM S_ETL_I_IMG_%SUFFIX
WHERE LAST_UPD > %ETL_START_TIME
;
DELETE
FROM S_ETL_R_IMG_%SUFFIX
WHERE
          EXISTS
          (
           SELECT
                    'X'
          FROM
                    S_ETL_I_IMG_%SUFFIX
          WHERE
                    S_ETL_R_IMG_%SUFFIX.ROW_ID = S_ETL_I_IMG_%SUFFIX.ROW_ID
          )
;
INSERT %APPEND INTO S_ETL_R_IMG_%SUFFIX
          (ROW_ID, MODIFICATION_NUM, LAST_UPD)
          SELECT
                  ROW_ID
                  ,MODIFICATION_NUM
                  ,LAST_UPD
          FROM
                  S_ETL_I_IMG_%SUFFIX
;
DELETE FROM S_ETL_R_IMG_%SUFFIX WHERE LAST_UPD < %PRUNED_ETL_START_TIME
;





Data engineering Interview Questions

1)  What all challenges you have faced and how did you overcome from it? Ans:- Challenges Faced and Overcome As a hypothetical Spark develop...