Skip to content

improves error reporting, enables foreign keys by default #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
package_dir={"": "src"},
packages=["cs50"],
url="https://github.com/cs50/python-cs50",
version="4.0.4"
version="5.0.0"
)
28 changes: 13 additions & 15 deletions src/cs50/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,16 @@ def __init__(self, url, **kwargs):
if not os.path.isfile(matches.group(1)):
raise RuntimeError("not a file: {}".format(matches.group(1)))

# Remember foreign_keys and remove it from kwargs
foreign_keys = kwargs.pop("foreign_keys", False)

# Create engine, raising exception if back end's module not installed
self.engine = sqlalchemy.create_engine(url, **kwargs)

# Enable foreign key constraints
if foreign_keys:
def connect(dbapi_connection, connection_record):
if type(dbapi_connection) is sqlite3.Connection: # If back end is sqlite
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
sqlalchemy.event.listen(self.engine, "connect", connect)
def connect(dbapi_connection, connection_record):
if type(dbapi_connection) is sqlite3.Connection: # If back end is sqlite
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
sqlalchemy.event.listen(self.engine, "connect", connect)

else:

Expand Down Expand Up @@ -263,27 +259,29 @@ def execute(self, sql, *args, **kwargs):
row[column] = float(row[column])
ret = rows

# If INSERT, return primary key value for a newly inserted row
# If INSERT, return primary key value for a newly inserted row (or None if none)
elif value == "INSERT":
if self.engine.url.get_backend_name() in ["postgres", "postgresql"]:
result = self.engine.execute("SELECT LASTVAL()")
ret = result.first()[0]
else:
ret = result.lastrowid
ret = result.lastrowid if result.lastrowid > 0 else None

# If DELETE or UPDATE, return number of rows matched
elif value in ["DELETE", "UPDATE"]:
ret = result.rowcount

# If constraint violated, return None
except sqlalchemy.exc.IntegrityError:
except sqlalchemy.exc.IntegrityError as e:
self._logger.debug(termcolor.colored(statement, "yellow"))
return None
e = RuntimeError(e.orig)
e.__cause__ = None
raise e

# If user errror
except sqlalchemy.exc.OperationalError as e:
self._logger.debug(termcolor.colored(statement, "red"))
e = RuntimeError(_parse_exception(e))
e = RuntimeError(e.orig)
e.__cause__ = None
raise e

Expand Down
33 changes: 14 additions & 19 deletions tests/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def test_string_literal_with_colon(self):

def tearDown(self):
self.db.execute("DROP TABLE cs50")
self.db.execute("DROP TABLE IF EXISTS foo")
self.db.execute("DROP TABLE IF EXISTS bar")

@classmethod
def tearDownClass(self):
Expand Down Expand Up @@ -132,29 +134,27 @@ class SQLiteTests(SQLTests):
def setUpClass(self):
open("test.db", "w").close()
self.db = SQL("sqlite:///test.db")
open("test1.db", "w").close()
self.db1 = SQL("sqlite:///test1.db", foreign_keys=True)

def setUp(self):
self.db.execute("DROP TABLE IF EXISTS cs50")
self.db.execute("CREATE TABLE cs50(id INTEGER PRIMARY KEY, val TEXT)")

def test_lastrowid(self):
self.db.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY AUTOINCREMENT, firstname TEXT, lastname TEXT)")
self.assertEqual(self.db.execute("INSERT INTO foo (firstname, lastname) VALUES('firstname', 'lastname')"), 1)
self.assertRaises(RuntimeError, self.db.execute, "INSERT INTO foo (id, firstname, lastname) VALUES(1, 'firstname', 'lastname')")
self.assertEqual(self.db.execute("INSERT OR IGNORE INTO foo (id, firstname, lastname) VALUES(1, 'firstname', 'lastname')"), None)

def test_integrity_constraints(self):
self.db.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY)")
self.assertEqual(self.db.execute("INSERT INTO foo VALUES(1)"), 1)
self.assertRaises(RuntimeError, self.db.execute, "INSERT INTO foo VALUES(1)")

def test_foreign_key_support(self):
self.db.execute("DROP TABLE IF EXISTS foo")
self.db.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY)")
self.db.execute("DROP TABLE IF EXISTS bar")
self.db.execute("CREATE TABLE bar(foo_id INTEGER, FOREIGN KEY (foo_id) REFERENCES foo(id))")
self.assertEqual(self.db.execute("INSERT INTO bar VALUES(50)"), 1)

self.db1.execute("DROP TABLE IF EXISTS foo")
self.db1.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY)")
self.db1.execute("DROP TABLE IF EXISTS bar")
self.db1.execute("CREATE TABLE bar(foo_id INTEGER, FOREIGN KEY (foo_id) REFERENCES foo(id))")
self.assertEqual(self.db1.execute("INSERT INTO bar VALUES(50)"), None)

self.assertRaises(RuntimeError, self.db.execute, "INSERT INTO bar VALUES(50)")

def test_qmark(self):
self.db.execute("DROP TABLE IF EXISTS foo")
self.db.execute("CREATE TABLE foo (firstname STRING, lastname STRING)")

self.db.execute("INSERT INTO foo VALUES (?, 'bar')", "baz")
Expand Down Expand Up @@ -188,7 +188,6 @@ def test_qmark(self):
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "bar", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("DROP TABLE IF EXISTS bar")
self.db.execute("CREATE TABLE bar (firstname STRING)")
self.db.execute("INSERT INTO bar VALUES (?)", "baz")
self.assertEqual(self.db.execute("SELECT * FROM bar"), [{"firstname": "baz"}])
Expand All @@ -203,7 +202,6 @@ def test_qmark(self):
self.assertRaises(RuntimeError, self.db.execute, "INSERT INTO foo VALUES (?, ?)", 'bar', baz='baz')

def test_named(self):
self.db.execute("DROP TABLE IF EXISTS foo")
self.db.execute("CREATE TABLE foo (firstname STRING, lastname STRING)")

self.db.execute("INSERT INTO foo VALUES (:baz, 'bar')", baz="baz")
Expand All @@ -226,7 +224,6 @@ def test_named(self):
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "bar", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("DROP TABLE IF EXISTS bar")
self.db.execute("CREATE TABLE bar (firstname STRING)")
self.db.execute("INSERT INTO bar VALUES (:baz)", baz="baz")
self.assertEqual(self.db.execute("SELECT * FROM bar"), [{"firstname": "baz"}])
Expand All @@ -238,7 +235,6 @@ def test_named(self):


def test_numeric(self):
self.db.execute("DROP TABLE IF EXISTS foo")
self.db.execute("CREATE TABLE foo (firstname STRING, lastname STRING)")

self.db.execute("INSERT INTO foo VALUES (:1, 'bar')", "baz")
Expand Down Expand Up @@ -272,7 +268,6 @@ def test_numeric(self):
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "bar", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("DROP TABLE IF EXISTS bar")
self.db.execute("CREATE TABLE bar (firstname STRING)")
self.db.execute("INSERT INTO bar VALUES (:1)", "baz")
self.assertEqual(self.db.execute("SELECT * FROM bar"), [{"firstname": "baz"}])
Expand Down