Imperative Programming for Databases
"""
These imports define the key objects
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
"""
These object and definitions are used throughout the Jupyter Notebook.
"""
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
data = 'sqlite:///sqlite.db' # path and filename of database
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = data
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy() #SQLAlchemy object called here
# This belongs in place where it runs once per project
db.init_app(app) # app object called here
""" database dependencies to support sqlite examples """
import datetime
from datetime import datetime
import json
from sqlalchemy.exc import IntegrityError
from werkzeug.security import generate_password_hash, check_password_hash
''' Tutorial: https://www.sqlalchemy.org/library.html#tutorials, try to get into a Python shell and follow along '''
# Define the User class to manage actions in the 'users' table
# -- Object Relational Mapping (ORM) is the key concept of SQLAlchemy
# -- a.) db.Model is like an inner layer of the onion in ORM
# -- b.) User represents data we want to store, something that is built on db.Model
# -- c.) SQLAlchemy ORM is layer on top of SQLAlchemy Core, then SQLAlchemy engine, SQL
class Symptom(db.Model):
# -- class User defining the template for users, used to create Objects
# -- db.Model is inheritance. Allows class Uers to use database attributes and methods.
__tablename__ = 'symptoms' # table name is plural, class name is singular
__table_args__ = {'extend_existing': True}
# Define the User schema with "vars" from object
id = db.Column(db.Integer, primary_key=True)
_name = db.Column(db.String(255), unique=True, nullable=False)
_ucomment = db.Column(db.String(255), unique=True, nullable=False)
_usymptom = db.Column(db.String(255), unique=True, nullable=False)
# constructor of a User object, initializes the instance variables within object (self)
def __init__(self, name, ucomment, usymptom): # -- "constructor". init method allows us to initialize an object from our User class
self._name = name # variables with self prefix become part of the object,
self._ucomment = ucomment
self._usymptom = usymptom
# a name getter method, extracts name from object, allows properties to be accessed like attributes of the object
@property
def name(self):
return self._name
# a setter function, allows name to be updated after initial object creation
@name.setter
def name(self, name):
self._name = name
# a getter method, extracts email from object
@property
def ucomment(self):
return self._ucomment
# a setter function, allows name to be updated after initial object creation
@ucomment.setter
def ucomment(self, ucomment):
self._ucomment = ucomment
# check if uid parameter matches user id in object, return boolean
def is_name(self, name):
return self._name == name
@property
def usymptom(self):
return self._usymptom
# a setter function, allows name to be updated after initial object creation
@usymptom.setter
def usymptom(self, usymptom):
self._usymptom = usymptom
def __str__(self):
return json.dumps(self.read())
def create(self):
try:
# creates a person object from User(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
"id": self.id,
"name": self.name,
"comment": self.ucomment,
"symptom": self.usymptom,
}
# CRUD update: updates user name, password, phone
# returns self
def update(self, name="", ucomment="", usymptom=""):
"""only updates values with length"""
if len(name) > 0:
self.name = name
if len(ucomment) > 0:
self.ucomment = ucomment
if len(usymptom) > 0:
self.usymptom = usymptom
db.session.commit()
return self
# CRUD delete: remove self
# None
def delete(self):
db.session.delete(self)
db.session.commit()
return None
"""Database Creation and Testing """
# Builds working data for testing
def initSymptoms():
with app.app_context():
"""Create database and tables"""
db.create_all() # this line creates the tables in the database, to allow data to be added
"""Tester data for table"""
u1 = Symptom(name='Annika Liao', ucomment='tylenol', usymptom='cramps')
u2 = Symptom(name='Claire Chen', ucomment='tea', usymptom='headaches')
u3 = Symptom(name='Claire Zhao', ucomment='heating pad', usymptom='cramps')
u4 = Symptom(name='Grace Wang', ucomment='magnesium', usymptom='muscle aches')
u5 = Symptom(name='Emma Shen', ucomment='vitamin d', usymptom='acne')
# user object is user in database. It takes the 4 arguments of name, uid, password, and dob.
symptoms = [u1, u2, u3, u4, u5]
"""Builds sample user/note(s) data"""
for symptom in symptoms:
try:
'''add comment to table'''
object = symptom.create()
print(f"Created new uid {object.name}") # try block is creation of user in database. If successful,this message will print.
except: # error raised if object nit created
'''fails with bad or duplicate data'''
print(f"Records exist uid {symptom.name}, or error.") #if try block fails, except block is executed, and this message will print.
initSymptoms()
import sqlite3
database = 'instance/sqlite.db' # this is location of database
def schema():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Fetch results of Schema
results = cursor.execute("PRAGMA table_info('periods')").fetchall()
# Print the results
for row in results:
print(row)
# Close the database connection
conn.close()
schema()
# metadata for database
def find_by_name(name):
with app.app_context():
user = Symptom.query.filter_by(_name=name).first() # this method constructs a query that retrieves all users in the database where _uid matches their uid
return user # returns user object
import sqlite3
def read():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Execute a SELECT statement to retrieve data from a table
results = cursor.execute('SELECT * FROM symptoms').fetchall()
# Print the results
if len(results) == 0:
print("Table is empty")
else:
for row in results:
print(row)
# Close the cursor and connection objects
cursor.close()
conn.close()
read()
import sqlite3
def create():
name = input("Enter your name:")
ucomment = input("Enter your comment:")
usymptom = input("Enter the symptom you're addressing")
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
# Execute an SQL command to insert data into a table
cursor.execute("INSERT INTO symptoms (_name, _ucomment, _usymptom) VALUES (?, ?, ?)", (name, ucomment, usymptom))
# Commit the changes to the database
conn.commit()
print(f"A new record {name} has been created")
except sqlite3.Error as error:
print("Error while the INSERT:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
#create()
import sqlite3
def update():
name = input("Enter your name to update records")
user = find_by_name(name)
if user is None:
print(f"No user found with name {name}")
return
print("Current user information:")
print(user.read())
ucomment = input("Enter comment")
if ucomment:
user.ucomment = ucomment
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
# Execute an SQL command to update data in a table
cursor.execute("UPDATE symptoms SET _ucomment = ? WHERE _name = ?", (ucomment, name))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"No name {name} was found in the table")
else:
print(f"The row with name {name} has been updated, comment changed to {ucomment}")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the UPDATE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
#update()
import sqlite3
def delete():
name = input("Enter name to delete")
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM symptoms WHERE _name = ?", (name,))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"Name {name} was not found in the table")
else:
# The uid was found in the ta'ble and the row was deleted
print(f"The record of name {name} was successfully deleted")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the DELETE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
#delete()
def menu():
operation = input("Enter: (C)reate (R)ead (U)pdate or (D)elete or (S)chema")
if operation.lower() == 'c':
create()
elif operation.lower() == 'r':
read()
elif operation.lower() == 'u':
update()
elif operation.lower() == 'd':
delete()
elif operation.lower() == 's':
schema()
elif len(operation)==0: # Escape Key
return
else:
print("Please enter c, r, u, or d")
menu() # recursion, repeat menu
try:
menu() # start menu
except:
print("Perform Jupyter 'Run All' prior to starting menu")