How to split dataset to two datasets with unique and duplicate rows each?











up vote
3
down vote

favorite












I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.










share|improve this question
























  • You can specify comma separated list of columns in partitionBy().
    – vindev
    Nov 22 at 5:40















up vote
3
down vote

favorite












I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.










share|improve this question
























  • You can specify comma separated list of columns in partitionBy().
    – vindev
    Nov 22 at 5:40













up vote
3
down vote

favorite









up vote
3
down vote

favorite











I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.










share|improve this question















I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.







scala apache-spark apache-spark-sql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 25 at 19:42









Jacek Laskowski

42.8k16126256




42.8k16126256










asked Nov 22 at 4:42









Revathi

226




226












  • You can specify comma separated list of columns in partitionBy().
    – vindev
    Nov 22 at 5:40


















  • You can specify comma separated list of columns in partitionBy().
    – vindev
    Nov 22 at 5:40
















You can specify comma separated list of columns in partitionBy().
– vindev
Nov 22 at 5:40




You can specify comma separated list of columns in partitionBy().
– vindev
Nov 22 at 5:40












2 Answers
2






active

oldest

votes

















up vote
1
down vote



accepted










You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )





share|improve this answer























  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
    – Revathi
    Nov 27 at 7:41










  • seems dup_cols_qry is empty string.. check again
    – stack0114106
    Nov 27 at 8:40










  • looks like you posted one more question and marked it as duplicate..
    – stack0114106
    Nov 27 at 8:52










  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
    – Revathi
    Nov 27 at 9:29












  • good..glad that it worked
    – stack0114106
    Nov 27 at 9:32


















up vote
0
down vote













You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer























  • the input contain N no of columns .. Its dynamic value
    – Revathi
    Nov 22 at 5:56











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424011%2fhow-to-split-dataset-to-two-datasets-with-unique-and-duplicate-rows-each%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























2 Answers
2






active

oldest

votes








2 Answers
2






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
1
down vote



accepted










You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )





share|improve this answer























  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
    – Revathi
    Nov 27 at 7:41










  • seems dup_cols_qry is empty string.. check again
    – stack0114106
    Nov 27 at 8:40










  • looks like you posted one more question and marked it as duplicate..
    – stack0114106
    Nov 27 at 8:52










  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
    – Revathi
    Nov 27 at 9:29












  • good..glad that it worked
    – stack0114106
    Nov 27 at 9:32















up vote
1
down vote



accepted










You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )





share|improve this answer























  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
    – Revathi
    Nov 27 at 7:41










  • seems dup_cols_qry is empty string.. check again
    – stack0114106
    Nov 27 at 8:40










  • looks like you posted one more question and marked it as duplicate..
    – stack0114106
    Nov 27 at 8:52










  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
    – Revathi
    Nov 27 at 9:29












  • good..glad that it worked
    – stack0114106
    Nov 27 at 9:32













up vote
1
down vote



accepted







up vote
1
down vote



accepted






You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )





share|improve this answer














You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )






share|improve this answer














share|improve this answer



share|improve this answer








edited Dec 4 at 3:39

























answered Nov 22 at 6:53









stack0114106

1,6521416




1,6521416












  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
    – Revathi
    Nov 27 at 7:41










  • seems dup_cols_qry is empty string.. check again
    – stack0114106
    Nov 27 at 8:40










  • looks like you posted one more question and marked it as duplicate..
    – stack0114106
    Nov 27 at 8:52










  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
    – Revathi
    Nov 27 at 9:29












  • good..glad that it worked
    – stack0114106
    Nov 27 at 9:32


















  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
    – Revathi
    Nov 27 at 7:41










  • seems dup_cols_qry is empty string.. check again
    – stack0114106
    Nov 27 at 8:40










  • looks like you posted one more question and marked it as duplicate..
    – stack0114106
    Nov 27 at 8:52










  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
    – Revathi
    Nov 27 at 9:29












  • good..glad that it worked
    – stack0114106
    Nov 27 at 9:32
















I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– Revathi
Nov 27 at 7:41




I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– Revathi
Nov 27 at 7:41












seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 at 8:40




seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 at 8:40












looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 at 8:52




looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 at 8:52












yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– Revathi
Nov 27 at 9:29






yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– Revathi
Nov 27 at 9:29














good..glad that it worked
– stack0114106
Nov 27 at 9:32




good..glad that it worked
– stack0114106
Nov 27 at 9:32












up vote
0
down vote













You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer























  • the input contain N no of columns .. Its dynamic value
    – Revathi
    Nov 22 at 5:56















up vote
0
down vote













You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer























  • the input contain N no of columns .. Its dynamic value
    – Revathi
    Nov 22 at 5:56













up vote
0
down vote










up vote
0
down vote









You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer














You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 22 at 6:33

























answered Nov 22 at 5:39









Taha Naqvi

1,055518




1,055518












  • the input contain N no of columns .. Its dynamic value
    – Revathi
    Nov 22 at 5:56


















  • the input contain N no of columns .. Its dynamic value
    – Revathi
    Nov 22 at 5:56
















the input contain N no of columns .. Its dynamic value
– Revathi
Nov 22 at 5:56




the input contain N no of columns .. Its dynamic value
– Revathi
Nov 22 at 5:56


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424011%2fhow-to-split-dataset-to-two-datasets-with-unique-and-duplicate-rows-each%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

Calculate evaluation metrics using cross_val_predict sklearn

Insert data from modal to MySQL (multiple modal on website)