Data Ingestion: Pandas/Vaex Presentation

There is a lot of Data

  • 2.5 quintillion bytes of data are created every day (2020)
  • 90% of all the data ever created has happened in the last 2 years
  • Volume of data is doubling every 2 years

Ingesting/Analyzing

  • Python/Pandas is the defacto standard for data analysis and fast import
  • Full featured excel/sql like functions available
  • Machine learning algorithms widely available
  • Cleansing/Statistics actions
  • Easy visualizations and sharing (plot/jupyter)

File Analysis

Typical file flow

Ingest File -> Processing -> Insights/Advantages

What is the problem? Pandas

  • CPU and memory bound
  • Designed to work on single core
  • Multicore is the standard for all modern computers

Solution: Enter Vaex

  • Pandas core functionality
  • Lazy evaluation
  • Memory mapped files
  • Multi core native
  • Uses HDF5
  • Billion rows per second on average computer
  • Clears need for big data pipelines
from logging import root
import os.path
from pandas.core.frame import DataFrame
import tables
import numpy
import time
from functools import wraps
import vaex
from pathlib import Path, WindowsPath
from tabulate import tabulate
from typing import Union
from pprint import pprint
import pandas
import uuid
import re
import structlog
import contextlib
import subprocess
from art import *
from vaex import column


log = structlog.get_logger()
root_path = Path(r"c:/temp/rattler")

def async_timeit(func):
    async def process(func, *args, **params):
        if asyncio.iscoroutinefunction(func):
            return await func(*args, **params)
        else:
            return func(*args, **params)
    async def helper(*args, **params):
        start = time.perf_counter()
        result = await process(func, *args, **params)
        end = time.perf_counter()
        total_time = end - start
        log.msg(f'Method: {func.__name__} {args} {kwargs} time {total_time:.4f} seconds')
        return result
    return helper

def timeit(func):
    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        total_time = end - start
        log.msg(f'Method: {func.__name__} {args} {kwargs} time {total_time:.4f} seconds')
        return result
    return timeit_wrapper

@timeit
def convert_with_errors(csv_file: Path, hdf5_file: Path) -> DataFrame:
    with open(csv_file.with_suffix('.errors.txt'), 'w') as log:
        with contextlib.redirect_stderr(log):
            with contextlib.redirect_stdout(log):
                result = pandas.read_csv(csv_file, engine='python', on_bad_lines='warn')
                result = vaex.from_pandas(result)
                return result.export_hdf5(hdf5_file)

@timeit
def convert_file(csv_file: Path, hdf5_file: Path) -> DataFrame:    
    try:
        return vaex.from_csv(csv_file, convert=True, chunk_size=100_000)
    except:
        return convert_with_errors(csv_file, hdf5_file)

@timeit
def read_file(csv_file: Path) -> Union[DataFrame, None]:
    if csv_file.is_file == False or csv_file.exists == False:
        return None
    #if os.path.isfile(csv_file) == False:
    #    return None
    #if os.path.exists(csv_file) == False:
    #    return None

    hdf5_file = '.'.join([f'{csv_file}', 'hdf5'])
    #hdf5_file = csv_file.with_suffix(".hdf5")    
    #if hdf5_file.exists == False:
    if os.path.exists(hdf5_file) == False:                
        convert_file(csv_file, hdf5_file)
    log.msg(f'Opening derived HDF5 file {hdf5_file} from {csv_file}')
    return vaex.open(hdf5_file)

@timeit
def output_results(data: vaex.vaex.dataframe.DataFrameLocal, output_file_name: str) -> None:
    #with pandas.option_context('display.max_rows', 5,
    #                       'display.max_columns', None,
    #                       'display.width', 1000,
    #                       'display.precision', 3,
    #                       'display.colheader_justify', 'center'):
    #    data.export_csv(output_file_name)
    data.export_csv(output_file_name)
    try:
        data.plot(data.longitude, data.latitude, f="log", show=True);
    except:
        log.msg("Unable to plot data")

@timeit
def random_csv_file_name(starts_with: str = 'sample') -> str:
    return f'{root_path}/{starts_with}_{uuid.uuid1()}.csv'

@timeit
def unquoted_test() -> None:
    data_set = read_file(Path(f"{root_path}/unquoted_strings.csv"))    
    output_results(data_set, random_csv_file_name('unquoted'))

@timeit
def carriage_returns_extra_test() -> None:
    data_set = read_file(Path(f"{root_path}/carriage_returns_extra_separators.csv"))    
    output_results(data_set, random_csv_file_name('carriage_returns_extra_separators'))

@timeit
def read_file_pandas(csv_file: Path, read_engine='c') -> Union[DataFrame, None]:
    log.msg(f"Reading csv file {csv_file} with pandas parsing engine {read_engine}")
    result = pandas.read_csv(csv_file, engine=read_engine)
    return result

@timeit
def extract_column(data: DataFrame, csv_file: Path, column_name: str) -> Union[DataFrame, None]:
    hdf5_file = '.'.join([f'{csv_file}', column_name, 'hdf5'])
    if os.path.exists(hdf5_file):
        os.remove(hdf5_file)
    extract_csv_file = '.'.join([f'{csv_file}', column_name, 'csv'])
    if os.path.exists(extract_csv_file):
        os.remove(extract_csv_file)
    log.msg(f"Extract {column_name} from csv file {csv_file} into hdf5 file {hdf5_file}")    

    column_values = data[column_name].values
    df = vaex.from_arrays(column=column_values)
    df.export_hdf5(hdf5_file)

    read_df = vaex.open(hdf5_file)
    read_df.export_csv(extract_csv_file)
    return df

@timeit
def plot_poison(results: DataFrame, poison: str) -> None:
    results = results[results.parameter_name.str.contains(poison)]
    output_results(results, random_csv_file_name(str.lower(poison)))

@timeit
def random_sample(results: DataFrame) -> None:
    random_sample = results.sample(frac=0.001)   
    output_results(random_sample, random_csv_file_name())

@timeit
def big_file_plot() -> None:
    #df = vaex.open(f'{root_path}/yellow_taxi_2015_f32s.hdf5')
    df = vaex.open(f'{root_path}/yellow_taxi_2009_2015_f32.hdf5')
    log.msg(f'Total rows: {df.shape[0]:,}, columns: {df.shape[1]}')
    long_min = -74.05
    long_max = -73.75
    lat_min = 40.58
    lat_max = 40.90
    df.plot(df.pickup_longitude, df.pickup_latitude, f="log1p", limits=[[long_min, long_max], [lat_min, lat_max]], show=True);

@timeit
def pollution_plot(data: DataFrame, state: str, latitude_min: float, latitude_max: float, longitude_min: float, longitude_max: float) -> None:    
    log.msg(f'Total rows: {data.shape[0]:,}, columns: {data.shape[1]}')
    data.plot(data.longitude, data.latitude, f="log1p", limits=[[longitude_min, longitude_max], [latitude_min, latitude_max]], show=True);

@timeit
def pollution_places(data: DataFrame) -> None:
    states = [('CA', 32.534156, 42.00951, -124.409591, -114.131211)] #, ('TX', 25.8737377, 36.50070, -106.645646, -93.508292)]
    for plot_state in states:
        (state, latitude_min, latitude_max, longitude_min, longitude_max) = plot_state
        pollution_plot(data, state, latitude_min, latitude_max, longitude_min, longitude_max)

@timeit
def pollution_plot1d(data: DataFrame, fields: list) -> None:    
    log.msg(f'Total rows: {data.shape[0]:,}, columns: {data.shape[1]}')
    pollutants = data.groupby(fields)
    log.msg(pollutants)


if __name__ == '__main__':
    log.msg(f'Current process {os.getpid()}')
    subprocess.Popen(f'explorer {root_path}')
    os.system("pause")
    
    csv_file = Path(f"{root_path}/epa_data.csv")

    tprint('pandas', font="doh")
    pandas_c_results = read_file_pandas(csv_file)
    #pandas_python_results = read_file_pandas(csv_file, read_engine='python')
    os.system("pause")

    tprint('vaex', font="doh")
    results = read_file(csv_file)
    log.msg(f'Rows loadeds {len(results)}')
    log.msg(f'Columns {list(results.columns)}')

    os.system("pause")
    carriage_returns_extra_test()
    unquoted_test()
    
    os.system("pause")    
    pollution_places(results)

    os.system("pause")
    random_sample(results)

    os.system("pause")
    extract_column(results, csv_file, "city_name")

    os.system("pause")
    plot_poison(results, 'Benzene')
    #plot_poison(results, 'chloride')    


    tprint('big file', font="doh")
    os.system("pause")
    big_file_plot()

    #pollution_plot1d(results, ['parameter_name', 'state_name'])

    #print(tabulate(random_sample, headers='keys', tablefmt='psql'))
    #first_n = df.head(10)
    #for row in texas:
        #print(row)
    #print(tabulate(df.head(10)))


    #state_values = df['state_name'].unique()
    #print(f'Unique values in state_name {state_values}')

    #texas = df["state_name"] == "Texas"
    #print(f'Rows loaded {len(texas)}')
   

    #texas = df[df.state_name == "Texas"]
    #output_results(texas, random_csv_file_name('texas'))