Here, we perform some standard imports:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, DataFrame, SQLContext, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, StringType, LongType, TimestampType
import pyspark.sql.functions as func
import pandas as pd
import datetime
import numpy as np
import timeit
If we re-run the entire notebook, we want to try to shut down the Spark context before trying to restart it below:
try:
sc.stop()
except NameError:
print("sc not defined.")
Set up our Spark contexts:
conf = SparkConf()
conf.setAppName('Timestamp Performance Demonstration')
conf.set("hive.metastore.pre.event.listeners", "")
sc = SparkContext(conf=conf)
hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.set('fs.azure', 'org.apache.hadoop.fs.azure.NativeAzureFileSystem')
hadoopConf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConf.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sql_context = SQLContext(sc)
Finally, something interesting. This block will create two dataframes of one million items each. The only difference is that one is a DateType
and the other one is a TypeStamp
type.
current_date = datetime.date.today()
million_dates = [Row(some_date=current_date + datetime.timedelta(days=i)) for i in range(1000000)]
date_df = sc.parallelize(million_dates).toDF()
current_timestamp = datetime.datetime.now()
million_timestamps = [Row(some_timestamp=current_timestamp + datetime.timedelta(seconds=i)) for i in range(1000000)]
timestamp_df = sc.parallelize(million_timestamps).toDF()
Let's do some verification to make sure that (1) we get the element cound that we expect and (2) that our schema reflects the correct types:
date_df.count(), date_df.schema, timestamp_df.count(), timestamp_df.schema
These next few cells demonstrate that processing dates, for whatever reason, is far faster than timestamps. At this point, we've not narrowed down where this problem lies, but due to the lack of CPU when running the second test, I assume that it is probably due to Spark and not Python.
def date_identity(some_date):
return some_date
def timestamp_identity(some_dt):
return some_dt
date_identity_udf = func.udf(date_identity, DateType())
timestamp_identity_udf = func.udf(timestamp_identity, TimestampType())
transformed_date_df = date_df.withColumn('identity', date_identity_udf(col('some_date')))
timeit.timeit(transformed_date_df.count, number=1)
transformed_timestamp_df = timestamp_df.withColumn('identity', timestamp_identity_udf(col('some_timestamp')))
timeit.timeit(transformed_timestamp_df.count, number=1)
As stated in the accompanying article, we're going to break up the marshalling and unmarshalling of the date and timestamp type to see if we can detect a performance difference.
date_type = DateType()
ts_type = TimestampType()
def date_to_int(date):
return date_type.toInternal(date)
def int_to_date(i):
return date_type.fromInternal(i)
def timestamp_to_int(timestamp):
return ts_type.toInternal(timestamp)
def int_to_timestamp(i):
return ts_type.fromInternal(i)
date_to_int_udf = func.udf(date_to_int, LongType())
int_to_date_udf = func.udf(int_to_date, DateType())
timestamp_to_int_udf = func.udf(timestamp_to_int, LongType())
int_to_timestamp_udf = func.udf(int_to_timestamp, TimestampType())
transformed_date_df = date_df.withColumn('int_repr', date_to_int_udf(col('some_date')))
timeit.timeit(transformed_date_df.count, number=1)
transformed_date_df = transformed_date_df.cache()
transformed_date_df = transformed_date_df.withColumn('date_from_int', int_to_date_udf(col('int_repr')))
timeit.timeit(transformed_date_df.count, number=1)
transformed_timestamp_df = timestamp_df.withColumn('int_repr', timestamp_to_int_udf(col('some_timestamp')))
timeit.timeit(transformed_timestamp_df.count, number=1)
transformed_timestamp_df = transformed_timestamp_df.cache()
transformed_timestamp_df = transformed_timestamp_df.withColumn('timestamp_from_int', int_to_timestamp_udf(col('int_repr')))
timeit.timeit(transformed_timestamp_df.count, number=1)
sql_context.clearCache()
The goal of this section is to do some experimentation to see if we can provide UDFs that can do a better job at handling date stuff.
One thought is that maybe we can write UDFs that operate on the timestamp and return an integer that Spark casts:
def cast_approach_timestamp_identity(timestamp_as_int):
dt = datetime.datetime.fromtimestamp(timestamp_as_int // 1000000)\
.replace(microsecond=timestamp_as_int % 1000000)
epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
return int(
((dt - epoch) / datetime.timedelta(microseconds=1)))
cast_approach_timestamp_identity_udf = func.udf(cast_approach_timestamp_identity, LongType())
transformed_timestamp_df = timestamp_df.withColumn('ts_as_int', col('some_timestamp').cast('long'))\
.withColumn('some_timestamp', cast_approach_timestamp_identity_udf(col('ts_as_int')).cast('timestamp'))
timeit.timeit(transformed_timestamp_df.count, number=1)
Here we can see the full round trip from timestamp to long back to timestamp and that using cast
causes it to take 18 seconds. This is still not very fast (about as fast as timestamp_to_int()
), but it's far faster than the test for timestamp_from_int()
. If you were check out htop
, you'd see there's still clearly contention, but it's improvement nonetheless.