PS: Cross posted from blog.imaginea.com

In this post, we’ll see how implicit casting in Spark SQL can lead to unexpected and wrong results. Actually Spark SQL follows MySQL and HIve to do the implicit type conversion for binary comparison.

Suppose we have this DataFrame with schema below:

df.show
 
+---+---+----+
|_c0|_c1| _c2|
+---+---+----+
|  1|1.0|   1|
|  2|1.0|   s|
|  3|3.1|null|
+---+---+----+
df.printSchema
 
root
 |-- _c0: integer (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: string (nullable = true)

Let’s run this query and see the result:

df.where("_c1==_c2").show
 
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|1.0|  1|
+---+---+---+

This is pretty cool. We don’t have to explicitly cast the column _c2(i.e.
StringType). But wait, now let’s run this:

df.where("_c1<>_c2").show   or   df.where("_c1!=_c2").show 
 
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
+---+---+---+

There is something fishy. The output should be like this as per general logic.

+---+---+----+
|_c0|_c1| _c2|
+---+---+----+
|  2|1.0|   s|
|  3|3.1|null|
+---+---+----+

It happened due the same implicit casting logic. Let’s take help of SQL explain.

df.where("_c1=_c2").explain
 
== Physical Plan ==
*Project [_c0#263, _c1#264, _c2#265]
+- *Filter ((isnotnull(_c1#264) && isnotnull(_c2#265)) && (_c1#264 = cast(_c2#265 as double)))
   +- *Scan csv [_c0#263,_c1#264,_c2#265] Format: CSV, InputPaths: file:/home/bipulk/spark_bug.csv, PartitionFilters: [], PushedFilters: [IsNotNull(_c1), IsNotNull(_c2)], ReadSchema: struct<_c0:int,_c1:double,_c2:string>

The above implicit cast is added during analyzing phase of Logical Plan
PromoteStrings rule in TypeCoercion.scala.

org.apache.spark.sql.catalyst.analysis.TypeCoercion
 
case p @ BinaryComparison(left @ StringType(), right) if right.dataType != StringType =>
  p.makeCopy(Array(Cast(left, DoubleType), right))
case p @ BinaryComparison(left, right @ StringType()) if left.dataType != StringType =>
  p.makeCopy(Array(left, Cast(right, DoubleType)))

and during actual casting null due to NumberFormatException is returned which
ultimately ignored during filtering.

org.apache.spark.sql.catalyst.expressions.Cast
 
// DoubleConverter
private[this] def castToDouble(from: DataType): Any => Any = from match {
  case StringType =>
    buildCast[UTF8String](_, s => try s.toString.toDouble catch {
      case _: NumberFormatException => null
    })
  case BooleanType =>
    buildCast[Boolean](_, b => if (b) 1d else 0d)
  case DateType =>
    buildCast[Int](_, d => null)
  case TimestampType =>
    buildCast[Long](_, t => timestampToDouble(t))
  case x: NumericType =>
    b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)
}

Also due to loss of precision, this type of error can happen:

spark.sql("select 19157170390056973L == '19157170390056971'").show

+-----------------------------------------------------------------------+
|(CAST(19157170390056973 AS DOUBLE) = CAST(19157170390056971 AS DOUBLE))|
+-----------------------------------------------------------------------+
|                                                                   true|
+-----------------------------------------------------------------------+

because

"19157170390056971".toDouble ==> 1.9157170390056972E16 and 


19157170390056973L.toDouble  ==> 1.9157170390056972E16

Whereas, Spark developers are coming up with a fix for this case. Refer
https://github.com/apache/spark/pull/15880. But the first case is not covered
in this bug fix.

So be careful while handing these types of cases where implicit casting
happens. The ultimate should be strict type-safety but that will break the Hive
compatibility badly.

You probably be thinking that we can impose explicit type casting using UDFs
which may throw exception in such cases but the problem is same with UDFs.

val nullResolveFun: Int=> Int = {s=>{
     
    if(s==null)
     0
    else 
     s.toInt
    }
}

df.withColumn("_c2",nullResolve('_c2)).where("_c1!=_c2").show

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
+---+---+---+

UDFs are also not type-safe.