Here, we perform some standard imports:

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, DataFrame, SQLContext, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, StringType, LongType, TimestampType

import pyspark.sql.functions as func

import pandas as pd
import datetime
import numpy as np
import timeit

If we re-run the entire notebook, we want to try to shut down the Spark context before trying to restart it below:

In [2]:
try:
  sc.stop()
except NameError:
  print("sc not defined.")
sc not defined.

Set up our Spark contexts:

In [3]:
conf = SparkConf()
conf.setAppName('Timestamp Performance Demonstration')
conf.set("hive.metastore.pre.event.listeners", "")

sc = SparkContext(conf=conf)

hadoopConf=sc._jsc.hadoopConfiguration()

hadoopConf.set('fs.azure', 'org.apache.hadoop.fs.azure.NativeAzureFileSystem')
hadoopConf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConf.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sql_context = SQLContext(sc)

Data Generation

Finally, something interesting. This block will create two dataframes of one million items each. The only difference is that one is a DateType and the other one is a TypeStamp type.

In [4]:
current_date = datetime.date.today()
million_dates = [Row(some_date=current_date + datetime.timedelta(days=i)) for i in range(1000000)]
date_df = sc.parallelize(million_dates).toDF()

current_timestamp = datetime.datetime.now()
million_timestamps = [Row(some_timestamp=current_timestamp + datetime.timedelta(seconds=i)) for i in range(1000000)]
timestamp_df = sc.parallelize(million_timestamps).toDF()

Let's do some verification to make sure that (1) we get the element cound that we expect and (2) that our schema reflects the correct types:

In [5]:
date_df.count(), date_df.schema, timestamp_df.count(), timestamp_df.schema
Out[5]:
(1000000,
 StructType(List(StructField(some_date,DateType,true))),
 1000000,
 StructType(List(StructField(some_timestamp,TimestampType,true))))

Demonstrating the Performance Impact

These next few cells demonstrate that processing dates, for whatever reason, is far faster than timestamps. At this point, we've not narrowed down where this problem lies, but due to the lack of CPU when running the second test, I assume that it is probably due to Spark and not Python.

In [6]:
def date_identity(some_date):
    return some_date

def timestamp_identity(some_dt):
    return some_dt

date_identity_udf = func.udf(date_identity, DateType())
timestamp_identity_udf = func.udf(timestamp_identity, TimestampType())
In [7]:
transformed_date_df = date_df.withColumn('identity', date_identity_udf(col('some_date')))
timeit.timeit(transformed_date_df.count, number=1)
Out[7]:
0.6908604120000064
In [8]:
transformed_timestamp_df = timestamp_df.withColumn('identity', timestamp_identity_udf(col('some_timestamp')))
timeit.timeit(transformed_timestamp_df.count, number=1)
Out[8]:
18.104324765

Breaking Apart the Marshalling and Unmarshalling

As stated in the accompanying article, we're going to break up the marshalling and unmarshalling of the date and timestamp type to see if we can detect a performance difference.

In [9]:
date_type = DateType()
ts_type = TimestampType()

def date_to_int(date):
    return date_type.toInternal(date)

def int_to_date(i):
    return date_type.fromInternal(i)

def timestamp_to_int(timestamp):
    return ts_type.toInternal(timestamp)

def int_to_timestamp(i):
    return ts_type.fromInternal(i)

date_to_int_udf = func.udf(date_to_int, LongType())
int_to_date_udf = func.udf(int_to_date, DateType())
timestamp_to_int_udf = func.udf(timestamp_to_int, LongType())
int_to_timestamp_udf = func.udf(int_to_timestamp, TimestampType())
In [10]:
transformed_date_df = date_df.withColumn('int_repr', date_to_int_udf(col('some_date')))
timeit.timeit(transformed_date_df.count, number=1)
Out[10]:
0.6534145329999888
In [11]:
transformed_date_df = transformed_date_df.cache()
transformed_date_df = transformed_date_df.withColumn('date_from_int', int_to_date_udf(col('int_repr')))
timeit.timeit(transformed_date_df.count, number=1)
Out[11]:
2.8746923990000255
In [12]:
transformed_timestamp_df = timestamp_df.withColumn('int_repr', timestamp_to_int_udf(col('some_timestamp')))
timeit.timeit(transformed_timestamp_df.count, number=1)
Out[12]:
18.128030870999964
In [13]:
transformed_timestamp_df = transformed_timestamp_df.cache()
transformed_timestamp_df = transformed_timestamp_df.withColumn('timestamp_from_int', int_to_timestamp_udf(col('int_repr')))
timeit.timeit(transformed_timestamp_df.count, number=1)
Out[13]:
53.599751571999946
In [14]:
sql_context.clearCache()

Testing Alternatives

The goal of this section is to do some experimentation to see if we can provide UDFs that can do a better job at handling date stuff.

One thought is that maybe we can write UDFs that operate on the timestamp and return an integer that Spark casts:

In [15]:
def cast_approach_timestamp_identity(timestamp_as_int):
    dt = datetime.datetime.fromtimestamp(timestamp_as_int // 1000000)\
        .replace(microsecond=timestamp_as_int % 1000000)
        
    epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)

    return int(
        ((dt - epoch) / datetime.timedelta(microseconds=1)))

cast_approach_timestamp_identity_udf = func.udf(cast_approach_timestamp_identity, LongType())
In [16]:
transformed_timestamp_df = timestamp_df.withColumn('ts_as_int', col('some_timestamp').cast('long'))\
    .withColumn('some_timestamp', cast_approach_timestamp_identity_udf(col('ts_as_int')).cast('timestamp'))
timeit.timeit(transformed_timestamp_df.count, number=1)
Out[16]:
18.29921136900009

Here we can see the full round trip from timestamp to long back to timestamp and that using cast causes it to take 18 seconds. This is still not very fast (about as fast as timestamp_to_int()), but it's far faster than the test for timestamp_from_int(). If you were check out htop, you'd see there's still clearly contention, but it's improvement nonetheless.