Python – Management of a SQLite database

By | 02/08/2023

In this post, we will see how to manage a SQLite database with Python.
But first of all, what is SQLite?
“SQLite is a lightweight, file-based relational database management system that is widely used due to its simplicity, portability, and ease of integration into various programming languages.
It provides a self-contained, serverless, and zero-configuration architecture, making it an excellent choice for small to medium-sized applications and projects.
SQLite differs from traditional client-server databases in that the entire database is contained within a single file, making it easy to distribute, copy, or backup. It stores data in a structured manner, allowing us to define tables, columns, and relationships similar to other relational databases.”

For all information, we can go the the official website: www.sqlite.org.

One of the significant advantages of SQLite, is its seamless integration with programming languages like Python. The Python Standard Library includes the sqlite3 module, which provides a convenient and intuitive way to interact with SQLite databases.
For this post, we are going to create a class called “CoreSQLite” that we will use to manage all the CRUD operations for a table called “TabUser” created in a DB called “UserDb”

CHECKING DB:

import sqlite3
import os


class CoreSQLite:
    connection = None

    def __init__(self, dbName):
        self.dbName = dbName
        self.dbFileName = dbName + '.db'
        self.checkDB()

    def checkDB(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        checkDbFile = os.path.join(working_dir,  self.dbFileName)
        # checking the existence of the database
        if not os.path.exists(checkDbFile):
            # Create the database file by opening a connection
            self.connection = sqlite3.connect(
                'file:' + checkDbFile + '?mode=rwc', uri=True)
            # Create the TabUser table
            self._createTabUser()
            # Close the connection
            self.connection.close()
            print(
                f"SQLite database file '{self.dbFileName}' created successfully.")
        else:
            print(f"SQLite database file '{self.dbFileName}' already exists.")

    def _createTabUser(self):
        # Create a cursor object to execute SQL statements
        cursor = self.connection.cursor()
        # Create the TabUser table with columns Id, Username, and Password
        cursor.execute(
            'CREATE TABLE TabUser (Id INTEGER PRIMARY KEY AUTOINCREMENT, Username TEXT(250),Password TEXT(20))')
        self.connection.commit()
        cursor.close()


[MAIN.PY]

from CoreSQLite import CoreSQLite

objDb = CoreSQLite('UserDb')

If we run the main.py file, the following will be the result:


Using a tool like DBeaver, we can check if the table TabUser has been created in the database:



INSERTING A NEW USER:

import sqlite3
import os


class CoreSQLite:
    connection = None
    cursor = None

    def __init__(self, dbName):
        self.dbName = dbName
        self.dbFileName = dbName + '.db'
        self.checkDB()

    def checkDB(self): ...

    def _createTabUser(self): ...

    def _openCursor(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        dbFile = os.path.join(working_dir,  self.dbFileName)
        self.connection = sqlite3.connect(dbFile)
        self.cursor = self.connection.cursor()

    # Create (Insert) operation
    def create_user(self, userName, password):
        self._openCursor()
        self.cursor.execute(
            'INSERT INTO TabUser (username, password) VALUES (?, ?)', (userName, password))
        self.connection.commit()
        self.cursor.close()     
        # Close the connection
        self.connection.close()
        print(f"User {userName} {password} created successfully.")


[MAIN.PY]

from CoreSQLite import CoreSQLite

objDb = CoreSQLite('UserDb')

# creating 20 users
for i in range(1, 20):
    objDb.create_user(f"User{i}", f"Pass123_{i}")

If we run the main.py file, the following will be the result:



SELECTING ALL USERS:

import sqlite3
import os


class CoreSQLite:
    connection = None
    cursor = None

    def __init__(self, dbName): ...

    def checkDB(self): ...

    def _createTabUser(self): ...

    def _openCursor(self): ...

    # Create (Insert) operation
    def create_user(self, userName, password): ...

    # Read operation
    def get_users(self):
        self._openCursor()
        self.cursor.execute('SELECT * FROM TabUser')
        rows = self.cursor.fetchall()
        self.cursor.close()
        # Close the connection
        self.connection.close()
        for row in rows:
            print(f"ID: {row[0]}, UserName: {row[1]}, Password: {row[2]}")


[MAIN.PY]

from CoreSQLite import CoreSQLite

objDb = CoreSQLite('UserDb')

objDb.get_users()

If we run the main.py file, the following will be the result:



UPDATING AN USER:

import sqlite3
import os


class CoreSQLite:
    connection = None
    cursor = None

    def __init__(self, dbName): ...

    def checkDB(self): ...

    def _createTabUser(self): ...

    def _openCursor(self): ...

    # Create (Insert) operation
    def create_user(self, userName, password): ...

    # Read operation
    def get_users(self): ...

    # Update operation
    def update_user(self, id, username, password):
        self._openCursor()
        self.cursor.execute(
            'UPDATE TabUser SET username = ?, password = ? WHERE id = ?', (username, password, id))
        self.connection.commit()
        self.cursor.close()  
        # Close the connection
        self.connection.close()
        print("User updated successfully.")


[MAIN.PY]

from CoreSQLite import CoreSQLite

objDb = CoreSQLite('UserDb')

# Create an User
objDb.create_user('DamianoUser', 'Pass123')

# Select Users
objDb.get_users()

# update User
objDb.update_user(1, 'UpdateDamianoUser', 'UpdatePass123')

# Select Users
objDb.get_users()

If we run the main.py file, the following will be the result:



DELETING AN USER:

import sqlite3
import os


class CoreSQLite:
    connection = None
    cursor = None

    def __init__(self, dbName): ...

    def checkDB(self): ...

    def _openCursor(self): ...

    # Create (Insert) operation
    def create_user(self, userName, password): ...

    # Read operation
    def get_users(self): ...

    # Update operation
    def update_user(self, id, username, password): ...

    # Delete operation
    def delete_user(self, id):
        self._openCursor()
        self.cursor.execute('DELETE FROM TabUser WHERE id = ?', (id,))
        self.connection.commit()
        self.cursor.close()
        # Close the connection
        self.connection.close()
        print("User deleted successfully.")


[MAIN.PY]

from CoreSQLite import CoreSQLite

objDb = CoreSQLite('UserDb')

# Create an User
objDb.create_user('DamianoUser', 'Pass123')

# Select Users
objDb.get_users()

# Delete User
objDb.delete_user(1)

# Select Users
objDb.get_users()

If we run the main.py file, the following will be the result:



[CORESQLITE.PY]

import sqlite3
import os


class CoreSQLite:
    connection = None
    cursor = None

    def __init__(self, dbName):
        self.dbName = dbName
        self.dbFileName = dbName + '.db'
        self.checkDB()

    def checkDB(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        checkDbFile = os.path.join(working_dir,  self.dbFileName)
        # checking the existence of the database
        if not os.path.exists(checkDbFile):
            # Create the database file by opening a connection
            self.connection = sqlite3.connect(
                'file:' + checkDbFile + '?mode=rwc', uri=True)
            # Create the TabUser table
            self._createTabUser()
            # Close the connection
            self.connection.close()
            print(
                f"SQLite database file '{self.dbFileName}' created successfully.")
        else:
            print(f"SQLite database file '{self.dbFileName}' already exists.")

    def _createTabUser(self):
        # Create a cursor object to execute SQL statements
        cursor = self.connection.cursor()
        # Create the TabUser table with columns Id, Username, and Password
        cursor.execute(
            'CREATE TABLE TabUser (Id INTEGER PRIMARY KEY AUTOINCREMENT, Username TEXT(250),Password TEXT(20))')
        self.connection.commit()
        cursor.close()

    def _openCursor(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        dbFile = os.path.join(working_dir,  self.dbFileName)
        self.connection = sqlite3.connect(dbFile)
        self.cursor = self.connection.cursor()

    # Create (Insert) operation
    def create_user(self, userName, password):
        self._openCursor()
        self.cursor.execute(
            'INSERT INTO TabUser (username, password) VALUES (?, ?)', (userName, password))
        self.connection.commit()
        self.cursor.close()    
        # Close the connection
        self.connection.close()
        print(f"User {userName} {password} created successfully.")

    # Read operation
    def get_users(self):
        self._openCursor()
        self.cursor.execute('SELECT * FROM TabUser')
        rows = self.cursor.fetchall()
        self.cursor.close()
        # Close the connection
        self.connection.close()
        for row in rows:
            print(f"ID: {row[0]}, UserName: {row[1]}, Password: {row[2]}")

    # Update operation
    def update_user(self, id, username, password):
        self._openCursor()
        self.cursor.execute(
            'UPDATE TabUser SET username = ?, password = ? WHERE id = ?', (username, password, id))
        self.connection.commit()
        self.cursor.close()    
        # Close the connection
        self.connection.close()
        print("User updated successfully.")

    # Delete operation
    def delete_user(self, id):
        self._openCursor()
        self.cursor.execute('DELETE FROM TabUser WHERE id = ?', (id,))
        self.connection.commit()
        self.cursor.close()    
        # Close the connection
        self.connection.close()
        print("User deleted successfully.")



Leave a Reply

Your email address will not be published. Required fields are marked *