Language phenotypes: SQL - Part 4
Doing a second part on this now where I do the same as Part 3 SQL with python and sql, again the core of the solving of the problem is using set functionality in SQL.
Again this will be CoderPad compatible
Problem 1: Contains All
Normally you would you fix up your data while looping through it but going to do some set manipulation with a group by
(postgres + python)
Part 1: List checks
Do check on the size of the lists for the special cases
Inputs: [None] Supply: [] means true
Inputs: [] Supply: [None] means true as well
If either case comes up say true and exit method otherwise it's time to matching and group by of below
Part 2: SQL
- Create two temp tables
- Load the inputs and supply lists into the tables
- Create two subqueries that group by the input text to get a key and count
- Left join inputs -> supply so we can make sure every input key will match an expected supply key
- Read the counts (coalesced to 0 if none) and run through each record of the resultset to make sure that supply count is greater than input count
Function prototype is
def contains_all(inputs: list, supply: list) -> bool:
Test sets:
print(contains_all([], ['a', 'b', 'c']) is True)
print(contains_all(['a'], ['a']) is True)
print(contains_all(['a'], ['b']) is False)
print(contains_all(['a', 'b', 'c'], ['b', 'a', 'c']) is True)
print(contains_all(['a', 'b', 'c'], ['a', 'b']) is False)
print(contains_all(['a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is True)
print(contains_all(['a', 'b', 'c'], ['q', 'r', 'n', 'a', 'c']) is False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'a', 'a', 'a', 'c']) is True)
print(contains_all([None], []) is True)
print(contains_all([], [None]) is True)
print(contains_all([None, 'a', None], [None, None, 'a']) is True)
print(contains_all([None, 'a', None, 'b'], [None, None, 'a']) is False)
The SQL looks like this
DROP TABLE IF EXISTS #inputs
CREATE TABLE #inputs (CompareValue NVARCHAR(100) NULL)
DROP TABLE IF EXISTS #supply
CREATE TABLE #supply (CompareValue NVARCHAR(100) NULL)
INSERT INTO [#inputs] ([CompareValue])
VALUES
(N'a'),
(N'a'),
(N'b'),
(NULL)
INSERT INTO [#supply] ([CompareValue])
VALUES
(N'a'),
(N'a'),
(N'a'),
(N'b'),
(NULL)
SELECT i.CompareValue, s.CompareValue, COALESCE(InputCount, 0), COALESCE(SupplyCount, 0)
FROM
(
SELECT [CompareValue], COUNT(*) InputCount
FROM [#inputs]
GROUP BY CompareValue
) i
LEFT JOIN
(
SELECT [CompareValue], COUNT(*) SupplyCount
FROM [#supply]
GROUP BY CompareValue
) s
ON i.[CompareValue] = s.[CompareValue] OR (i.[CompareValue] IS NULL AND s.[CompareValue] IS NULL)
Example: ['a'] = ['a']
- Do the simple check for
None
and empty lists - Create and load the comparison tables
{inputs, supply}
- Run the raw sql query to build out the results table
- Read each result row tuple into variables and compare if the
input_count > supply_count
then fail the matches and return - If no failure then all
inputs and counts
are at least in thesupply set
Python Code compatible with Postgres using SqlAlchemy and will run in CoderPad.io
import sqlalchemy
from sqlalchemy.orm import sessionmaker
def contains_all(inputs, supply):
if len(inputs) == 0:
return True
if len(supply) == 0 and len(inputs) == 1 and None in inputs:
return True
engine = sqlalchemy.create_engine('postgresql://coderpad:@/coderpad?host=/tmp/postgresql/socket')
connection = engine.connect()
# Connect to the DB and reflect metadata.
Session = sessionmaker(bind=engine)
session = Session()
metadata = sqlalchemy.MetaData()
metadata.reflect(bind=engine)
metadata.clear()
inputs_name = 'inputs'
supply_name = 'supply'
inputs_table = sqlalchemy.Table(inputs_name, metadata, sqlalchemy.Column('CompareValue', sqlalchemy.String(100)))
if not sqlalchemy.inspect(engine).has_table(inputs_name):
inputs_table.create(engine)
session.query(inputs_table).delete()
session.commit()
supply_table = sqlalchemy.Table(supply_name, metadata, sqlalchemy.Column('CompareValue', sqlalchemy.String(100)))
if not sqlalchemy.inspect(engine).has_table(supply_name):
supply_table.create(engine)
session.query(supply_table).delete()
session.commit()
engine.execute(inputs_table.insert(), list({'CompareValue': i} for i in inputs))
engine.execute(supply_table.insert(), list({'CompareValue': i} for i in supply))
result = connection.execute("""SELECT i."CompareValue", s."CompareValue", COALESCE(i."InputCount", 0) Inputs, COALESCE(s."SupplyCount", 0) Outputs
FROM
(
SELECT "CompareValue", COUNT(*) "InputCount"
FROM inputs
GROUP BY "CompareValue"
) i
LEFT JOIN
(
SELECT "CompareValue", COUNT(*) "SupplyCount"
FROM supply
GROUP BY "CompareValue"
) s
ON i."CompareValue" = s."CompareValue" OR (i."CompareValue" IS NULL AND s."CompareValue" IS NULL)""")
for result_row in result.fetchall():
input_value, supply_value, input_count, supply_count = result_row
if input_count > supply_count:
return False
return True
print(contains_all([], ['a', 'b', 'c']) == True)
print(contains_all(['a'], ['a']) == True)
print(contains_all(['a'], ['b']) == False)
print(contains_all(['a', 'b', 'c'], ['b', 'a', 'c']) == True)
print(contains_all(['a', 'b', 'c'], ['a', 'b']) == False)
print(contains_all(['a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) == True)
print(contains_all(['a', 'b', 'c'], ['q', 'r', 'n', 'a', 'c']) == False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) == False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'a', 'a', 'a', 'c']) == True)
print(contains_all([None], []) == True)
print(contains_all([], [None]) == True)
print(contains_all([None, 'a', None], [None, None, 'a']) == True)
print(contains_all([None, 'a', None, 'b'], [None, None, 'a']) == False)
And here is a version you can use locally using sqlite3 raw sql and an in memory database. I am getting entity framework vibes off of sql alchemy and need to find a more Dapper like library for python SQL
import sqlite3
def create_and_load_table(db_connection: sqlite3.Connection, table_name: str, values: list):
cur = db_connection.cursor()
cur.execute(f"DROP TABLE IF EXISTS {table_name}")
cur.execute(f"CREATE TABLE {table_name} (CompareValue NVARCHAR(100) NULL)")
if len(values) > 0:
cur.executemany(f"INSERT INTO {table_name} (CompareValue) VALUES (?)",
[x if x is not None else (None,) for x in values])
def contains_all(inputs: list, supply: list) -> bool:
if len(inputs) == 0:
return True
if len(supply) == 0 and len(inputs) == 1 and None in inputs:
return True
con = sqlite3.connect(':memory:')
create_and_load_table(con, 'inputs', inputs)
create_and_load_table(con, 'supply', supply)
cursor = con.cursor()
cursor.execute("""
SELECT i.CompareValue, s.CompareValue, COALESCE(i.InputCount, 0) Inputs, COALESCE(s.SupplyCount, 0) Outputs
FROM
(
SELECT CompareValue, COUNT(*) InputCount
FROM inputs
GROUP BY CompareValue
) i
LEFT JOIN
(
SELECT CompareValue, COUNT(*) SupplyCount
FROM supply
GROUP BY CompareValue
) s
ON i.CompareValue = s.CompareValue OR (i.CompareValue IS NULL AND s.CompareValue IS NULL)
""")
for result_row in cursor.fetchall():
input_value, supply_value, input_count, supply_count = result_row
if input_count > supply_count:
return False
return True
if __name__ == '__main__':
print(contains_all([], ['a', 'b', 'c']) is True)
print(contains_all(['a'], ['a']) is True)
print(contains_all(['a'], ['b']) is False)
print(contains_all(['a', 'b', 'c'], ['b', 'a', 'c']) is True)
print(contains_all(['a', 'b', 'c'], ['a', 'b']) is False)
print(contains_all(['a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is True)
print(contains_all(['a', 'b', 'c'], ['q', 'r', 'n', 'a', 'c']) is False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'q', 'r', 'n', 'a', 'c']) is False)
print(contains_all(['a', 'a', 'b', 'c'], ['b', 'a', 'a', 'a', 'c']) is True)
print(contains_all([None], []) is True)
print(contains_all([], [None]) is True)
print(contains_all([None, 'a', None], [None, None, 'a']) is True)
print(contains_all([None, 'a', None, 'b'], [None, None, 'a']) is False)
Why would you do this?
It bypasses the can't use external libraries code thing coderpad says they do and pushes you to think of this operation in a set theory format which is more functional and delivering fast performance and would be the way to make this code faster. Take that algorithm and find it. SQL has far superior dynamic algorithms for doing this matching in code, why rewrite for a dumb test and production code increases in speed and reduces costs by moving the code as close to the data as you feel comfortable.
I apologize for nothing and if I get another one of these questions I am going to pull this technique out to make someone's head explode with
Them: You can't do that. He's doing it, wait it works
Me: Yeah, you want to make stuff scream you do this with indexes etc because the problem you asked is a data matching problem