Apache Spark doesn’t bring the most fundamental essential features of a data processing system such as ACID transactions and other data management functions. To conquer this feature Databricks (the organization established by the first engineers of Apache Spark) has released Delta Lake, an open-source storage intermediate layer for Spark.
Delta-Lake
Delta Lake is a Storage layer that was sitting on top of your existing HDFS cluster & Cloud storage or even runs it your PC. Data is stored on the above-mentioned storage as versioned Parquet files. Instead of directly interacting with the storage layer, our programs talk to the delta lake for reading and writing the data using Delta lake API. Delta lakes provide a unified platform to support both Batch Processing and Stream processing workloads on a single platform. It acts as an intermediate service between the computation & storage layer.
what-is-delta-lake

key features of Delta Lake:

  1. ACID Transaction
  2. Schema Enforcement
  3. Data versioning/Time Travel
  4. DML Support
  5. Unified Batch and streaming sink

ACID Transaction:

DeltaLog is the core of Delta Lake which guarantees atomicity, consistency, isolation, and durability of the client started transactions.
DeltaLog is an arranged record of transactions. Each transaction performed since the initiation of Delta Lake Table has an entry in the DeltaLog (otherwise called the Delta Lake transaction log). It acts as a single source of truth, giving users access to the last version of a DeltaTable’s state. It provides serializability, the strongest level of isolation level.
Atomicity and Consistency
Delta Lake separates each activity performed by a client into atomic commits, themselves composed of actions. Successful completion of all actions of a commit ensures that DeltaLog records that commit. In case of any failed job, the commit is not recorded in the DeltaLog.

Batch 1 Job:

var data = spark.range(0, 10)
data.write.mode(SaveMode.Overwrite).format("delta").save(path)

Executing BatchJob-1 creates a record of 10 integers in a parquet file and one commit. the commit Json file was present in _delta_log directory
image7

Sample Commit log file content:

{"commitInfo":{"timestamp":1591000080757,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputBytes":"1617","numOutputRows":"2"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"c2f668ea-ecc0-4052-9b29-b26fd64c1b4c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1591000079645}}
{"add":{"path":"part-00000-9d7cb856-af9d-43a1-ad98-81eed128b943-c000.snappy.parquet","partitionValues":{},"size":341,"modificationTime":1591000080000,"dataChange":true}}
{"add":{"path":"part-00003-96e61c33-6f5e-4015-bf51-a99273958748-c000.snappy.parquet","partitionValues":{},"size":648,"modificationTime":1591000080000,"dataChange":true}}
{"add":{"path":"part-00007-b0e03831-819f-46ad-9596-06527ed81b11-c000.snappy.parquet","partitionValues":{},"size":628,"modificationTime":1591000080000,"dataChange":true}}

Batch 2 Job:
Now, Let’s create another Job to do the same task but raise an exception in the middle.

import spark.implicits._
  Try(spark.range(0, 10).coalesce(1).map(i => {
    if (i.toLong > 5) {
      Thread.sleep(5000)
      throw new RuntimeException("Create Exception")
    }
    i
  }).write.mode(SaveMode.Overwrite).format("delta").save(path)).get

The failed job didn’t create any commit log and the previous commit file still exists in the target location this will help to make our Data in a valid state. This ensures Atomicity and Consistency.

Isolation & Durability
Delta Lake takes care of concurrent read-write access by managing Concurrency of commits. This is done using optimistic concurrency control. This means that:

  • when a commit execution starts, the thread snapshots the current DeltaLog.
  • once the commit actions complete, the thread checks if the DeltaLog is updated by another one in the meantime.
    • if not, it records the commit in the DeltaLog.
    • else, it updates its DeltaTable view and attempts again to register the commit after a step of reprocessing, if needed.

This ensures the Isolation property.

All of the transactions made on Delta Lake tables are stored directly to disk. This process satisfies the ACID property of durability, meaning it will persist even in the event of system failure.

Schema Enforcement:

Schema Enforcement feature helps to prevent bad data ingested in our data lakes by providing the ability to specify the schema and help enforce it. It prevents data corruption by preventing the bad data to get into the system even before the data is ingested into the data lake by giving sensible error messages.
By default Spark, Support only schema on data read not in write which creates a big issue on Data consistency. it's tricky to troubleshoot & we will end up with the following queries in our mind

  • Which one is the faulty Job?
  • Which data file is causing this problem?
  • How many rows are faulty?
  • How do we correct it?
  • How long it will take to bring the system in a consistent state?
  • When we can restart the Failing jobs?

Example on Schema issue:
Create Spark Session

# Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder. \
                appName("schemaEnforceMentIssue"). \
                config("spark.executor.memory", "3g"). \
                config("spark.executor.cores", 3). \
                config("spark.cores.max", 6). \
                config('spark.jars.packages', 'io.delta:delta-core_2.11:0.6.0'). \
                master("local[*]"). \
                getOrCreate()

Create Batch1 DataFrame

batch1_df = spark.createDataFrame([(1, "srini"),(2, "Ram")] , ["id", "name"])
batch1_df.show()
+---+-----+
| id| name|
+---+-----+
|  1|srini|
|  2|  Ram|
+---+-----+
batch1_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

Create Batch2 DataFrame

batch2_df = spark.createDataFrame([(1.0, "srinivasan"),(2.0, "Rama")] , ["id", "name"])
batch2_df.show()
+---+----------+
| id|      name|
+---+----------+
|1.0|srinivasan|
|2.0|      Rama|
+---+----------+
batch2_df.printSchema()
root
 |-- id: double (nullable = true)
 |-- name: string (nullable = true)

when we write data into target location it has to throw data integrity exception but in spark, it's not happening

path="/home/srinivasan/Desktop/topic_model/out_data"
batch1_df.write.mode("append").parquet(path)
batch2_df.write.mode("append").parquet(path)

now we are trying to read data from the target location we will end up with Data integrity exception.

spark.read.parquet(path).show()
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-15-c9f373cf415f> in <module>
----> 1 spark.read.parquet(path).show()

/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(


Py4JJavaError: An error occurred while calling o165.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 71, localhost, executor driver): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///home/srinivasan/Desktop/topic_model/out_data/part-00003-76b90f76-0306-4976-9663-a09494d692dd-c000.snappy.parquet. Column: [id], Expected: bigint, Found: DOUBLE

Delta Lake Schema on Write
when we write batch2 data into target location it was throwing data integrity exception which will ensure bad records haven't been inserted.

path="/home/srinivasan/Desktop/topic_model/out_data"
batch1_df.write.format("delta").mode("append").save(path)
batch2_df.write.format("delta").mode("append").save(path)
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:


/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:


Py4JJavaError: An error occurred while calling o276.save.
: org.apache.spark.sql.AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types LongType and DoubleType;;

Data versioning/Time Travel:

Data versioning enables rollbacks, full historical audit trails, and reproducible machine learning experiments.

DML Support:

Supports merge, update, and delete operations to empower complex use cases like change-data-capture, slowly-changing-dimension (SCD) operations, streaming upserts, etc.

package com.pramati.lake

import org.apache.spark.sql.SparkSession
import java.io.File
import io.delta.tables._
import org.apache.spark.sql.functions._

object Basic_curd extends App {

  val spark = SparkSession
    .builder()
    .appName("firstBasic")
    .master("local[*]")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  val file = new File("/home/srinivasan/Desktop/topic_model/delta/delta-table_basic")
  val path = file.getCanonicalPath
  val delta = DeltaTable.forPath(path)

  //Delete Operation
  delta.delete("id>9")

  import scala.collection.JavaConverters._
  // Update Operation
  delta.update(
    condition = expr("id % 2 == 0"),
    set = Map("id" -> expr("id + 100")))

  val newData = spark.range(0, 20).toDF

  delta.as("oldData")
    .merge(
      newData.as("newData"),
      "oldData.id = newData.id")
    .whenMatched
    .update(Map("id" -> col("newData.id")))
    .whenNotMatched
    .insert(Map("id" -> col("newData.id")))
    .execute()
  import spark.implicits._
  delta.toDF.orderBy($"id").show()

}

Unified Batch and Streaming Sink:

In a Data lake, in the use case that we have a utilization instance of both Stream processing and Batch processing, it is entirely expected to follow Lambda architecture. In Data lake, data coming in as Stream (perhaps from Kafka) or any chronicled data you have (say HDFS) is the same table. It gives a brought together perspective on both these 2 unique ideal models. Streaming data ingest, batch historic backfill, and interactive queries work just out of the box without much of the extra effort.

References

  1. https://docs.delta.io/latest/index.html
  2. https://blog.knoldus.com/spark-acid-transaction-with-delta-lake/