Thursday, March 28, 2024

DATABRICKS Pyspark Important

 ================DUPLICATE===============

# Define a Window specification

window_spec = Window.partitionBy(*columns_to_check).orderBy("SLS_HUB_INTRNL_ACCT_ID")


# Use rank() function to mark duplicates

df_final_prism_repartition_reject = df_final_prism_repartition.withColumn("is_duplicate", F.rank().over(window_spec) > 1).filter(col("is_duplicate") == "true")




===========READING FILE ===========


df = spark.read.format("csv")\

  .option("header",True)\

    .option("delimiter", "~")\

      .option("quote", "\"")\

        .option("encoding","ISO-8859-1")\

          .option("dateFormat","MM/dd/yyyy")\

            .schema(input_file_read_schema)\

              .load(work_file_path)

  

=================#Current CST Time===============

#Current CST Time

CST = pytz.timezone('US/Central')

Load_TMS =datetime.now().astimezone(CST).strftime('%Y-%m-%d %H:%M:%S')   

===========================================

input_file_read_schema = StructType([

    StructField("ACCOUNT_ID", StringType(), nullable=True),

    StructField("OW_ID", StringType(), nullable=True),

    StructField("HQ_ID", StringType(), nullable=True),

    StructField("JB_ID", StringType(), nullable=True)

])



===========WRITE FILE===========================

df_final_prism_repartition.write\

  .format("delta")\

  .mode("overwrite")\

  .save(struct_path)

====================================


BATCH_ID = datetime.now().astimezone(CST).strftime('%Y%m%d%H%M%S')

20240123021120

====================================


=============Clean unprintable characters from columns=====

'''df = df.withColumn("CART_ID", regexp_replace("CART_ID", r"[^\x01-\xFF]", ""))\

  .withColumn("CORP_NM", regexp_replace("CORP_NM", r"[^\x01-\xFF]", ""))\

  .withColumn("SLS_CONTRCT_AUTHRZ_NBR", regexp_replace("SLS_CONTRCT_AUTHRZ_NBR", r"[^\x01-\xFF]", ""))\

    .withColumn("CONTRCT_CATG_NM", regexp_replace("CONTRCT_CATG_NM", r"[^\x01-\xFF]", ""))

'''


===============Remove all space from all column in dataframe=======

for columnName in df.columns :

  df = df.withColumn(columnName, regexp_replace(col(columnName),"\\s+"," "))

  

======= is null otherwise===============================


df = df.withColumn("stgdate", when(trim(col("EDRA_ACTION_DATE")).isNotNull() == True, substring_index(trim(col("EDRA_ACTION_DATE")),' ',1)).otherwise(""))\

  .withColumn("stgtime", when(trim(col("EDRA_ACTION_DATE")).isNotNull() == True, substring_index(trim(col("EDRA_ACTION_DATE")),' ',-1)).otherwise(""))\

  .withColumn("stgdateValidty", when((col("stgdate") != '') & to_date(col("stgdate"),"MM/dd/yyyy").isNotNull(),lit(1)).otherwise(lit(0)))

==withColumnRenamed===================================


df = df.withColumn("OW_ID", trim(df.OW_ID)).withColumnRenamed("OW_ID","SLS_ACCT_ONEWRLD_ID")\

        .withColumn("HQ_ID", trim(df.HQ_ID)).withColumnRenamed("HQ_ID","SLS_ACCT_HDQ_ID")\

        .withColumn("PRISM_ID", when(trim(col("PRISM_ID")).isNotNull() == True, trim(col("PRISM_ID"))).otherwise("")).withColumnRenamed("PRISM_ID","CORP_ID")\

        .withColumn("COMMON_CORP_ID", when(trim(col("COMMON_CORP_ID")).isNotNull() == True, trim(col("COMMON_CORP_ID"))).otherwise("")).withColumnRenamed("COMMON_CORP_ID","COMMON_CORP_ACCT_ID")\

        .withColumn("ACCOUNT_ID", trim(df.ACCOUNT_ID)).withColumnRenamed("ACCOUNT_ID","SLS_HUB_INTRNL_ACCT_ID")\

        .withColumn("EDRA_REQUEST_TYPE", trim(df.EDRA_REQUEST_TYPE)).withColumnRenamed("EDRA_REQUEST_TYPE","EDRA_RQST_TYPE_TXT")\

        .withColumn("JB_ID", trim(df.JB_ID)).withColumnRenamed("JB_ID","JB_CORP_ID")\

        .withColumn("ROW_EFF_DT", current_date())\

        .withColumn("ROW_EXP_DT", lit('9999-12-31'))\

        .withColumn("ROW_STATUS_IND", lit('I'))\

 ===================widgets ===================== 

 

dbutils.widgets.text("adb_par", "","")

dbutils.widgets.text('env', "","")


PipelineName = dbutils.widgets.get('PipelineName')

env = dbutils.widgets.get('env')



====call other notebook============================

%run "../../config/corpd_parameterSetup"



============Run a query and get=========

status_chk = f"""select count(*) as v_not_empty from corpd_{env}_struct.corpd_job_status where STATUS = 'R' and JOB_NAME = '{PipelineName}'"""

print(status_chk)

last_status_chk = spark.sql(status_chk).collect()[0][0]


=========HOW TO GET NOTEBOOK NAME===========================

notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().getOrElse(None)


# Extract the notebook name from the path

notebook_name = notebook_path.split("/")[-1]

print(notebook_name)


================================================

=======================TABLE to load in VIEW==================

##Mosaic JDBC info###

driver = 'com.teradata.jdbc.TeraDriver'

jdbcURL = f"jdbc:teradata://{mosaic_cpd_server}"


sql_query_text = f"""Select * from cert_MOSAIC_DB.PRISM_CONTRCT_INFO"""


df = spark.read \

      .format('jdbc') \

      .option('driver', driver) \

      .option('url', jdbcURL) \

      .option('query', sql_query_text) \

      .option('user', mosaic_cpd_user) \

      .option('password', mosaic_cpd_passwd)\

      .load()


df = df.drop_duplicates()


print(df.count())

view_name = "PRISM_CONTRCT_INFO"

print(view_name)

df.createOrReplaceTempView(view_name)

============================================================

==================dropDuplicate()===========================


df_accept = df_accept.dropDuplicates(['AccountAnalystName',

                                      'AccountMgrEmployeeID',

                                      'AccountMgrName',

                                      'AccountType',

                                      'Website',

                                      'TaxNumber'])



===========================Count in all coulms wise=================================   

# Find Count of Null, None, NaN of All DataFrame Columns

from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(col(CONTRCT_ID).isNull(), 

    CONTRCT_ID)).alias(CONTRCT_ID) for CONTRCT_ID in df.columns]).show()


df.select([count(col(CORP_ID)).alias(CORP_ID) for CORP_ID in df.columns]).show()



=====================String to date conversion=========


df=df.withColumn("CONTRCT_EFF_DT",to_date(to_date("CONTRCT_EFF_DT","MM/dd/yyyy"),'yyyy-MM-dd'))

df=df.withColumn("CONTRCT_EXP_DT",to_date(to_date("CONTRCT_EXP_DT","MM/dd/yyyy"),'yyyy-MM-dd'))

display(df)



======================================================


AZURE AAD ServicePrincipalCredentials and ClientSecretCredential classes

 Both ServicePrincipalCredentials and ClientSecretCredential are classes in the azure-identity module of the Azure SDK for Python, 

which allow authenticating with Azure Active Directory (AAD) using various methods.

Here are the main differences between these two classes:


Authentication Method:-

ServicePrincipalCredentials supports several ways to authenticate with AAD, including certificate-based authentication,

interactive login via a web browser, and password-based authentication using a service principal. On the other hand, ClientSecretCredential only

supports password-based authentication using a service principal.


Constructor Parameters:-

To construct a ServicePrincipalCredentials object, you need to specify several pieces of information, including the tenant ID, client ID, 

and either a certificate file path or a username and password. By contrast, ClientSecretCredential only requires three constructor arguments: tenant ID, client ID, 

and client secret.



print("Setting ServicePrincipalCredentials")

from azure.common.credentials import ServicePrincipalCredentials

spCredentials = ServicePrincipalCredentials(client_id=service_principle_clientid, secret=spSecret, tenant=service_principle_directory_id)


or 

 from azure.identity import ClientSecretCredential

  print("Setting spCredentials using ClientSecretCredential")

 spCredentials = ClientSecretCredential(tenant_id=service_principle_directory_id, client_id=service_principle_clientid, client_secret=spSecret)


-


Data engineering Interview Questions

1)  What all challenges you have faced and how did you overcome from it? Ans:- Challenges Faced and Overcome As a hypothetical Spark develop...