================DUPLICATE===============
# Define a Window specification
window_spec = Window.partitionBy(*columns_to_check).orderBy("SLS_HUB_INTRNL_ACCT_ID")
# Use rank() function to mark duplicates
df_final_prism_repartition_reject = df_final_prism_repartition.withColumn("is_duplicate", F.rank().over(window_spec) > 1).filter(col("is_duplicate") == "true")
===========READING FILE ===========
df = spark.read.format("csv")\
.option("header",True)\
.option("delimiter", "~")\
.option("quote", "\"")\
.option("encoding","ISO-8859-1")\
.option("dateFormat","MM/dd/yyyy")\
.schema(input_file_read_schema)\
.load(work_file_path)
=================#Current CST Time===============
#Current CST Time
CST = pytz.timezone('US/Central')
Load_TMS =datetime.now().astimezone(CST).strftime('%Y-%m-%d %H:%M:%S')
===========================================
input_file_read_schema = StructType([
StructField("ACCOUNT_ID", StringType(), nullable=True),
StructField("OW_ID", StringType(), nullable=True),
StructField("HQ_ID", StringType(), nullable=True),
StructField("JB_ID", StringType(), nullable=True)
])
===========WRITE FILE===========================
df_final_prism_repartition.write\
.format("delta")\
.mode("overwrite")\
.save(struct_path)
====================================
BATCH_ID = datetime.now().astimezone(CST).strftime('%Y%m%d%H%M%S')
20240123021120
====================================
=============Clean unprintable characters from columns=====
'''df = df.withColumn("CART_ID", regexp_replace("CART_ID", r"[^\x01-\xFF]", ""))\
.withColumn("CORP_NM", regexp_replace("CORP_NM", r"[^\x01-\xFF]", ""))\
.withColumn("SLS_CONTRCT_AUTHRZ_NBR", regexp_replace("SLS_CONTRCT_AUTHRZ_NBR", r"[^\x01-\xFF]", ""))\
.withColumn("CONTRCT_CATG_NM", regexp_replace("CONTRCT_CATG_NM", r"[^\x01-\xFF]", ""))
'''
===============Remove all space from all column in dataframe=======
for columnName in df.columns :
df = df.withColumn(columnName, regexp_replace(col(columnName),"\\s+"," "))
======= is null otherwise===============================
df = df.withColumn("stgdate", when(trim(col("EDRA_ACTION_DATE")).isNotNull() == True, substring_index(trim(col("EDRA_ACTION_DATE")),' ',1)).otherwise(""))\
.withColumn("stgtime", when(trim(col("EDRA_ACTION_DATE")).isNotNull() == True, substring_index(trim(col("EDRA_ACTION_DATE")),' ',-1)).otherwise(""))\
.withColumn("stgdateValidty", when((col("stgdate") != '') & to_date(col("stgdate"),"MM/dd/yyyy").isNotNull(),lit(1)).otherwise(lit(0)))
==withColumnRenamed===================================
df = df.withColumn("OW_ID", trim(df.OW_ID)).withColumnRenamed("OW_ID","SLS_ACCT_ONEWRLD_ID")\
.withColumn("HQ_ID", trim(df.HQ_ID)).withColumnRenamed("HQ_ID","SLS_ACCT_HDQ_ID")\
.withColumn("PRISM_ID", when(trim(col("PRISM_ID")).isNotNull() == True, trim(col("PRISM_ID"))).otherwise("")).withColumnRenamed("PRISM_ID","CORP_ID")\
.withColumn("COMMON_CORP_ID", when(trim(col("COMMON_CORP_ID")).isNotNull() == True, trim(col("COMMON_CORP_ID"))).otherwise("")).withColumnRenamed("COMMON_CORP_ID","COMMON_CORP_ACCT_ID")\
.withColumn("ACCOUNT_ID", trim(df.ACCOUNT_ID)).withColumnRenamed("ACCOUNT_ID","SLS_HUB_INTRNL_ACCT_ID")\
.withColumn("EDRA_REQUEST_TYPE", trim(df.EDRA_REQUEST_TYPE)).withColumnRenamed("EDRA_REQUEST_TYPE","EDRA_RQST_TYPE_TXT")\
.withColumn("JB_ID", trim(df.JB_ID)).withColumnRenamed("JB_ID","JB_CORP_ID")\
.withColumn("ROW_EFF_DT", current_date())\
.withColumn("ROW_EXP_DT", lit('9999-12-31'))\
.withColumn("ROW_STATUS_IND", lit('I'))\
===================widgets =====================
dbutils.widgets.text("adb_par", "","")
dbutils.widgets.text('env', "","")
PipelineName = dbutils.widgets.get('PipelineName')
env = dbutils.widgets.get('env')
====call other notebook============================
%run "../../config/corpd_parameterSetup"
============Run a query and get=========
status_chk = f"""select count(*) as v_not_empty from corpd_{env}_struct.corpd_job_status where STATUS = 'R' and JOB_NAME = '{PipelineName}'"""
print(status_chk)
last_status_chk = spark.sql(status_chk).collect()[0][0]
=========HOW TO GET NOTEBOOK NAME===========================
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().getOrElse(None)
# Extract the notebook name from the path
notebook_name = notebook_path.split("/")[-1]
print(notebook_name)
================================================
=======================TABLE to load in VIEW==================
##Mosaic JDBC info###
driver = 'com.teradata.jdbc.TeraDriver'
jdbcURL = f"jdbc:teradata://{mosaic_cpd_server}"
sql_query_text = f"""Select * from cert_MOSAIC_DB.PRISM_CONTRCT_INFO"""
df = spark.read \
.format('jdbc') \
.option('driver', driver) \
.option('url', jdbcURL) \
.option('query', sql_query_text) \
.option('user', mosaic_cpd_user) \
.option('password', mosaic_cpd_passwd)\
.load()
df = df.drop_duplicates()
print(df.count())
view_name = "PRISM_CONTRCT_INFO"
print(view_name)
df.createOrReplaceTempView(view_name)
============================================================
==================dropDuplicate()===========================
df_accept = df_accept.dropDuplicates(['AccountAnalystName',
'AccountMgrEmployeeID',
'AccountMgrName',
'AccountType',
'Website',
'TaxNumber'])
===========================Count in all coulms wise=================================
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(col(CONTRCT_ID).isNull(),
CONTRCT_ID)).alias(CONTRCT_ID) for CONTRCT_ID in df.columns]).show()
df.select([count(col(CORP_ID)).alias(CORP_ID) for CORP_ID in df.columns]).show()
=====================String to date conversion=========
df=df.withColumn("CONTRCT_EFF_DT",to_date(to_date("CONTRCT_EFF_DT","MM/dd/yyyy"),'yyyy-MM-dd'))
df=df.withColumn("CONTRCT_EXP_DT",to_date(to_date("CONTRCT_EXP_DT","MM/dd/yyyy"),'yyyy-MM-dd'))
display(df)
======================================================