Data Ingestion: Pandas/Vaex Presentation

There is a lot of Data

  • 2.5 quintillion bytes of data are created every day (2020)
  • 90% of all the data ever created has happened in the last 2 years
  • Volume of data is doubling every 2 years


  • Python/Pandas is the defacto standard for data analysis and fast import
  • Full featured excel/sql like functions available
  • Machine learning algorithms widely available
  • Cleansing/Statistics actions
  • Easy visualizations and sharing (plot/jupyter)

File Analysis

Typical file flow

Ingest File -> Processing -> Insights/Advantages

What is the problem? Pandas

  • CPU and memory bound
  • Designed to work on single core
  • Multicore is the standard for all modern computers

Solution: Enter Vaex

  • Pandas core functionality
  • Lazy evaluation
  • Memory mapped files
  • Multi core native
  • Uses HDF5
  • Billion rows per second on average computer
  • Clears need for big data pipelines
from logging import root
import os.path
from pandas.core.frame import DataFrame
import tables
import numpy
import time
from functools import wraps
import vaex
from pathlib import Path, WindowsPath
from tabulate import tabulate
from typing import Union
from pprint import pprint
import pandas
import uuid
import re
import structlog
import contextlib
import subprocess
from art import *
from vaex import column

log = structlog.get_logger()
root_path = Path(r"c:/temp/rattler")

def async_timeit(func):
    async def process(func, *args, **params):
        if asyncio.iscoroutinefunction(func):
            return await func(*args, **params)
            return func(*args, **params)
    async def helper(*args, **params):
        start = time.perf_counter()
        result = await process(func, *args, **params)
        end = time.perf_counter()
        total_time = end - start
        log.msg(f'Method: {func.__name__} {args} {kwargs} time {total_time:.4f} seconds')
        return result
    return helper

def timeit(func):
    def timeit_wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        total_time = end - start
        log.msg(f'Method: {func.__name__} {args} {kwargs} time {total_time:.4f} seconds')
        return result
    return timeit_wrapper

def convert_with_errors(csv_file: Path, hdf5_file: Path) -> DataFrame:
    with open(csv_file.with_suffix('.errors.txt'), 'w') as log:
        with contextlib.redirect_stderr(log):
            with contextlib.redirect_stdout(log):
                result = pandas.read_csv(csv_file, engine='python', on_bad_lines='warn')
                result = vaex.from_pandas(result)
                return result.export_hdf5(hdf5_file)

def convert_file(csv_file: Path, hdf5_file: Path) -> DataFrame:    
        return vaex.from_csv(csv_file, convert=True, chunk_size=100_000)
        return convert_with_errors(csv_file, hdf5_file)

def read_file(csv_file: Path) -> Union[DataFrame, None]:
    if csv_file.is_file == False or csv_file.exists == False:
        return None
    #if os.path.isfile(csv_file) == False:
    #    return None
    #if os.path.exists(csv_file) == False:
    #    return None

    hdf5_file = '.'.join([f'{csv_file}', 'hdf5'])
    #hdf5_file = csv_file.with_suffix(".hdf5")    
    #if hdf5_file.exists == False:
    if os.path.exists(hdf5_file) == False:                
        convert_file(csv_file, hdf5_file)
    log.msg(f'Opening derived HDF5 file {hdf5_file} from {csv_file}')

def output_results(data: vaex.vaex.dataframe.DataFrameLocal, output_file_name: str) -> None:
    #with pandas.option_context('display.max_rows', 5,
    #                       'display.max_columns', None,
    #                       'display.width', 1000,
    #                       'display.precision', 3,
    #                       'display.colheader_justify', 'center'):
    #    data.export_csv(output_file_name)
        data.plot(data.longitude, data.latitude, f="log", show=True);
        log.msg("Unable to plot data")

def random_csv_file_name(starts_with: str = 'sample') -> str:
    return f'{root_path}/{starts_with}_{uuid.uuid1()}.csv'

def unquoted_test() -> None:
    data_set = read_file(Path(f"{root_path}/unquoted_strings.csv"))    
    output_results(data_set, random_csv_file_name('unquoted'))

def carriage_returns_extra_test() -> None:
    data_set = read_file(Path(f"{root_path}/carriage_returns_extra_separators.csv"))    
    output_results(data_set, random_csv_file_name('carriage_returns_extra_separators'))

def read_file_pandas(csv_file: Path, read_engine='c') -> Union[DataFrame, None]:
    log.msg(f"Reading csv file {csv_file} with pandas parsing engine {read_engine}")
    result = pandas.read_csv(csv_file, engine=read_engine)
    return result

def extract_column(data: DataFrame, csv_file: Path, column_name: str) -> Union[DataFrame, None]:
    hdf5_file = '.'.join([f'{csv_file}', column_name, 'hdf5'])
    if os.path.exists(hdf5_file):
    extract_csv_file = '.'.join([f'{csv_file}', column_name, 'csv'])
    if os.path.exists(extract_csv_file):
    log.msg(f"Extract {column_name} from csv file {csv_file} into hdf5 file {hdf5_file}")    

    column_values = data[column_name].values
    df = vaex.from_arrays(column=column_values)

    read_df =
    return df

def plot_poison(results: DataFrame, poison: str) -> None:
    results = results[results.parameter_name.str.contains(poison)]
    output_results(results, random_csv_file_name(str.lower(poison)))

def random_sample(results: DataFrame) -> None:
    random_sample = results.sample(frac=0.001)   
    output_results(random_sample, random_csv_file_name())

def big_file_plot() -> None:
    #df ='{root_path}/yellow_taxi_2015_f32s.hdf5')
    df ='{root_path}/yellow_taxi_2009_2015_f32.hdf5')
    log.msg(f'Total rows: {df.shape[0]:,}, columns: {df.shape[1]}')
    long_min = -74.05
    long_max = -73.75
    lat_min = 40.58
    lat_max = 40.90
    df.plot(df.pickup_longitude, df.pickup_latitude, f="log1p", limits=[[long_min, long_max], [lat_min, lat_max]], show=True);

def pollution_plot(data: DataFrame, state: str, latitude_min: float, latitude_max: float, longitude_min: float, longitude_max: float) -> None:    
    log.msg(f'Total rows: {data.shape[0]:,}, columns: {data.shape[1]}')
    data.plot(data.longitude, data.latitude, f="log1p", limits=[[longitude_min, longitude_max], [latitude_min, latitude_max]], show=True);

def pollution_places(data: DataFrame) -> None:
    states = [('CA', 32.534156, 42.00951, -124.409591, -114.131211)] #, ('TX', 25.8737377, 36.50070, -106.645646, -93.508292)]
    for plot_state in states:
        (state, latitude_min, latitude_max, longitude_min, longitude_max) = plot_state
        pollution_plot(data, state, latitude_min, latitude_max, longitude_min, longitude_max)

def pollution_plot1d(data: DataFrame, fields: list) -> None:    
    log.msg(f'Total rows: {data.shape[0]:,}, columns: {data.shape[1]}')
    pollutants = data.groupby(fields)

if __name__ == '__main__':
    log.msg(f'Current process {os.getpid()}')
    subprocess.Popen(f'explorer {root_path}')
    csv_file = Path(f"{root_path}/epa_data.csv")

    tprint('pandas', font="doh")
    pandas_c_results = read_file_pandas(csv_file)
    #pandas_python_results = read_file_pandas(csv_file, read_engine='python')

    tprint('vaex', font="doh")
    results = read_file(csv_file)
    log.msg(f'Rows loadeds {len(results)}')
    log.msg(f'Columns {list(results.columns)}')



    extract_column(results, csv_file, "city_name")

    plot_poison(results, 'Benzene')
    #plot_poison(results, 'chloride')    

    tprint('big file', font="doh")

    #pollution_plot1d(results, ['parameter_name', 'state_name'])

    #print(tabulate(random_sample, headers='keys', tablefmt='psql'))
    #first_n = df.head(10)
    #for row in texas:

    #state_values = df['state_name'].unique()
    #print(f'Unique values in state_name {state_values}')

    #texas = df["state_name"] == "Texas"
    #print(f'Rows loaded {len(texas)}')

    #texas = df[df.state_name == "Texas"]
    #output_results(texas, random_csv_file_name('texas'))