Language phenotypes: SQL - Part 4

Doing a second part on this now where I do the same as Part 3 SQL with python and sql, again the core of the solving of the problem is using set functionality in SQL.

Again this will be  CoderPad compatible

Problem 1: Contains All

Normally you would you fix up your data while looping through it but going to do some set manipulation with a group by (postgres + python)

Part 1: List checks

Do check on the size of the lists for the special cases

Inputs: [None] Supply: [] means true
Inputs: [] Supply: [None] means true as well

If either case comes up say true and exit method otherwise it's time to matching and group by of below

Part 2: SQL

  • Create two temp tables
  • Load the inputs and supply lists into the tables
  • Create two subqueries that group by the input text to get a key and count
  • Left join inputs -> supply so we can make sure every input key will match an expected supply key
  • Read the counts (coalesced to 0 if none) and run through each record of the resultset to make sure that supply count is greater than input count

Function prototype is

def contains_all(inputs: list, supply: list) -> bool:

Test sets:

print(contains_all([], ['a', 'b', 'c']) is True)
print(contains_all(['a'], ['a']) is True)
print(contains_all(['a'], ['b']) is False)
print(contains_all(['a', 'b', 'c'], ['b', 'a', 'c']) is True)
print(contains_all(['a', 'b', 'c'], ['a', 'b']) is False)
print(contains_all(['a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is True)
print(contains_all(['a', 'b', 'c'], ['q', 'r', 'n', 'a', 'c']) is False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'a', 'a', 'a', 'c']) is True)
print(contains_all([None], []) is True)
print(contains_all([], [None]) is True)
print(contains_all([None, 'a', None], [None, None, 'a']) is True)
print(contains_all([None, 'a', None, 'b'], [None, None, 'a']) is False)

The SQL looks like this

DROP TABLE IF EXISTS #inputs
CREATE TABLE #inputs (CompareValue NVARCHAR(100) NULL)

DROP TABLE IF EXISTS #supply
CREATE TABLE #supply (CompareValue NVARCHAR(100) NULL)

INSERT INTO [#inputs] ([CompareValue])
VALUES
(N'a'),
(N'a'),
(N'b'),
(NULL)

INSERT INTO [#supply] ([CompareValue])
VALUES
(N'a'),
(N'a'),
(N'a'),
(N'b'),
(NULL)

SELECT i.CompareValue, s.CompareValue, COALESCE(InputCount, 0), COALESCE(SupplyCount, 0)
FROM
(
	SELECT [CompareValue], COUNT(*) InputCount
	FROM [#inputs]
	GROUP BY CompareValue
) i
LEFT JOIN
(
	SELECT [CompareValue], COUNT(*) SupplyCount
	FROM [#supply]
	GROUP BY CompareValue
) s
ON i.[CompareValue] = s.[CompareValue] OR (i.[CompareValue] IS NULL AND s.[CompareValue] IS NULL)

Example: ['a'] = ['a']

  1. Do the simple check for None and empty lists
  2. Create and load the comparison tables {inputs, supply}
  3. Run the raw sql query to build out the results table
  4. Read each result row tuple into variables and compare if the input_count > supply_count then fail the matches and return
  5. If no failure then all inputs and counts are at least in the supply set

Python Code compatible with Postgres using SqlAlchemy and will run in CoderPad.io

import sqlalchemy
from sqlalchemy.orm import sessionmaker


def contains_all(inputs, supply):
    if len(inputs) == 0:
        return True
    if len(supply) == 0 and len(inputs) == 1 and None in inputs:
        return True
    engine = sqlalchemy.create_engine('postgresql://coderpad:@/coderpad?host=/tmp/postgresql/socket')
    connection = engine.connect()
    # Connect to the DB and reflect metadata.
    Session = sessionmaker(bind=engine)
    session = Session()
    metadata = sqlalchemy.MetaData()
    metadata.reflect(bind=engine)

    metadata.clear()
    inputs_name = 'inputs'
    supply_name = 'supply'

    inputs_table = sqlalchemy.Table(inputs_name, metadata, sqlalchemy.Column('CompareValue', sqlalchemy.String(100)))
    if not sqlalchemy.inspect(engine).has_table(inputs_name):    
        inputs_table.create(engine)
    session.query(inputs_table).delete()
    session.commit()

    supply_table = sqlalchemy.Table(supply_name, metadata, sqlalchemy.Column('CompareValue', sqlalchemy.String(100)))   
    if not sqlalchemy.inspect(engine).has_table(supply_name):
        supply_table.create(engine)
    session.query(supply_table).delete()
    session.commit()

    engine.execute(inputs_table.insert(), list({'CompareValue': i} for i in inputs))
    engine.execute(supply_table.insert(), list({'CompareValue': i} for i in supply))
    
    result = connection.execute("""SELECT i."CompareValue", s."CompareValue", COALESCE(i."InputCount", 0) Inputs, COALESCE(s."SupplyCount", 0) Outputs
FROM
(
    SELECT "CompareValue", COUNT(*) "InputCount"
    FROM inputs
    GROUP BY "CompareValue"
) i
LEFT JOIN
(
    SELECT "CompareValue", COUNT(*) "SupplyCount"
    FROM supply
    GROUP BY "CompareValue"
) s
ON i."CompareValue" = s."CompareValue" OR (i."CompareValue" IS NULL AND s."CompareValue" IS NULL)""")
    
    for result_row in result.fetchall():
        input_value, supply_value, input_count, supply_count = result_row
        if input_count > supply_count:
            return False
    return True


print(contains_all([], ['a', 'b', 'c']) == True)
print(contains_all(['a'], ['a']) == True)
print(contains_all(['a'], ['b']) == False)
print(contains_all(['a', 'b', 'c'], ['b', 'a', 'c']) == True)
print(contains_all(['a', 'b', 'c'], ['a', 'b']) == False)
print(contains_all(['a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) == True)
print(contains_all(['a', 'b', 'c'], ['q', 'r', 'n', 'a', 'c']) == False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) == False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'a', 'a', 'a', 'c']) == True)
print(contains_all([None], []) == True)
print(contains_all([], [None]) == True)
print(contains_all([None, 'a', None], [None, None, 'a']) == True)
print(contains_all([None, 'a', None, 'b'], [None, None, 'a']) == False)

And here is a version you can use locally using sqlite3 raw sql and an in memory database.  I am getting entity framework vibes off of sql alchemy and need to find a more Dapper like library for python SQL

import sqlite3


def create_and_load_table(db_connection: sqlite3.Connection, table_name: str, values: list):
    cur = db_connection.cursor()
    cur.execute(f"DROP TABLE IF EXISTS {table_name}")
    cur.execute(f"CREATE TABLE {table_name} (CompareValue NVARCHAR(100) NULL)")
    if len(values) > 0:
        cur.executemany(f"INSERT INTO {table_name} (CompareValue) VALUES (?)",
                        [x if x is not None else (None,) for x in values])


def contains_all(inputs: list, supply: list) -> bool:
    if len(inputs) == 0:
        return True
    if len(supply) == 0 and len(inputs) == 1 and None in inputs:
        return True
    con = sqlite3.connect(':memory:')
    create_and_load_table(con, 'inputs', inputs)
    create_and_load_table(con, 'supply', supply)
    cursor = con.cursor()
    cursor.execute("""
SELECT i.CompareValue, s.CompareValue, COALESCE(i.InputCount, 0) Inputs, COALESCE(s.SupplyCount, 0) Outputs
FROM
(
	SELECT CompareValue, COUNT(*) InputCount
	FROM inputs
	GROUP BY CompareValue
) i
LEFT JOIN
(
	SELECT CompareValue, COUNT(*) SupplyCount
	FROM supply
	GROUP BY CompareValue
) s
ON i.CompareValue = s.CompareValue OR (i.CompareValue IS NULL AND s.CompareValue IS NULL)    
    """)

    for result_row in cursor.fetchall():
        input_value, supply_value, input_count, supply_count = result_row
        if input_count > supply_count:
            return False
    return True


if __name__ == '__main__':
    print(contains_all([], ['a', 'b', 'c']) is True)
    print(contains_all(['a'], ['a']) is True)
    print(contains_all(['a'], ['b']) is False)
    print(contains_all(['a', 'b', 'c'], ['b', 'a', 'c']) is True)
    print(contains_all(['a', 'b', 'c'], ['a', 'b']) is False)
    print(contains_all(['a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is True)
    print(contains_all(['a', 'b', 'c'], ['q', 'r', 'n', 'a', 'c']) is False)
    print(contains_all(['a', 'a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is False)
    print(contains_all(['a', 'a', 'b', 'c'], ['b', 'a', 'a', 'a', 'c']) is True)
    print(contains_all([None], []) is True)
    print(contains_all([], [None]) is True)
    print(contains_all([None, 'a', None], [None, None, 'a']) is True)
    print(contains_all([None, 'a', None, 'b'], [None, None, 'a']) is False)

Why would you do this?  

It bypasses the can't use external libraries code thing coderpad says they do and pushes you to think of this operation in a set theory format which is more functional and delivering fast performance and would be the way to make this code faster.  Take that algorithm and find it.  SQL has far superior dynamic algorithms for doing this matching in code, why rewrite for a dumb test and production code increases in speed and reduces costs by moving the code as close to the data as you feel comfortable.

I apologize for nothing and if I get another one of these questions I am going to pull this technique out to make someone's head explode with

Them: You can't do that. He's doing it, wait it works

Me: Yeah, you want to make stuff scream you do this with indexes etc because the problem you asked is a data matching problem