Python – CRUD operations in Redis

By | 31/01/2024

In this post, we will see how to implement CRUD operations for an ‘User’ object in Redis, defines as follows:

{
  "name": string, 
  "age": int, 
  "city": string
}

Before to start, I want to remember that REDIS (Remote Dictionary Server), is an open-source, in-memory data structure store, used as a database, cache, and message broker. It supports various data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, geospatial indexes, and streams.
The common use cases for Redis are:
Caching: the most common use case, reducing the load on databases and improving response times.
Session Storage: storing user session data for web applications.
Real-Time Analytics: suitable for scenarios needing real-time analysis of data, like in gaming, advertising, etc.
For all information, the redis web site is: https://redis.io/

First of all, we define a docker-compose file to create a docker container with an instance of Redis:

version: '3.8'  # Specifies the Docker Compose version

services:
  redis:
    image: redis:latest  # Use the latest Redis image
    ports:
      - "6379:6379"  # Map the default Redis port to the same port on the host
    volumes:
      - redis_data:/data  # Persist data using a volume

volumes:
  redis_data:  # Defines the volume to persist Redis data

Then, using the command docker-compose up -d, we will create the container:

Docker

Finally, with the command docker ps, we can verify that everything is working correctly:

Docker


Before starting to write code, we have to install Redis Python Client, using the command:

pip install redis
install redis


Now, we can start to develop code!
First, we create a config file to store Redis parameters:
[CONFIG.INI]

[redis]
server = localhost
port = 6379
db = 0

Then, we define the RedisRepository file used to manage CRUD operations:
[REDISREPOSITORY.PY]

import configparser

class RedisRepository:
    redisclient = None
    server = None
    port = None
    database = None

    # method used to get the parameters to connect to Redis
    def _get_redis_parameters(self):
        # definition of the object used to read the config file
        configfile = configparser.ConfigParser()
        configfile.read("config.ini")

        rediscache = configfile["redis"]
        server = rediscache["server"]
        port = rediscache["port"]
        database = rediscache["db"]

    def __init__(self):
        # get Redis paramters
        self._get_redis_parameters()

Finally, we will add all methods for the CRUD operations!

INSERT USER:

import configparser
import redis
import uuid


class RedisRepository:
    redisclient = None
    server = None
    port = None
    database = None

    # method used to get the parameters to connect to Redis
    def _get_redis_parameters(self):
        # definition of the object used to read the config file
        configfile = configparser.ConfigParser()
        configfile.read("config.ini")

        rediscache = configfile["redis"]
        self.server = rediscache["server"]
        self.port = rediscache["port"]
        self.database = rediscache["db"]

    def __init__(self):
        # get Redis paramters
        self._get_redis_parameters()

    def insert_user(self, name, age, city):
        """
        Creates a new user in Redis with a GUID as the user ID.
        :param name: Name of the user.
        :param age: Age of the user.
        :param city: City of the user.
        :return: The GUID of the created user.
        """
        user_id = str(uuid.uuid4())  # Generate a GUID for the user ID
        user_key = f"user:{user_id}"
        user_data = {
            "name": name,
            "age": age,
            "city": city
        }

        try:
            # Using a context manager to handle the Redis connection
            # This ensures that the connection is properly closed after
            # the block of code is executed, even if an exception occurs:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                #  Hset is a command in Redis that is used to set the value of one or more
                # fields in a hash. It's a very versatile and commonly used command in Redis,
                # especially when dealing with objects or entities that have multiple attributes
                client.hset(user_key, mapping=user_data)
                # Add the user key to the set of users
                client.sadd("users", user_key)
                return user_id
        except Exception as error:
            print(f"Error during the insert operation, because of: {error}")
            return 0

[MAIN.PY]

from RediRepository import RedisRepository

redisrepo = RedisRepository()

for item in range(1, 10):
    newId = redisrepo.insert_user(f'Name_{item}', f'City_{item}', 40+item)
    if newId == "0":
        print("The user hasn't been created!")
    else:
        print(f"The user has been created with the Id - {newId}")

If we run the file main.py, the following will be the result:

Insert data in Redis with Python
Insert data in Redis with Python
Insert data in Redis with Python


SELECT USER:

def select_all_users(self):
        """
        Retrieves all users from Redis.
        :return: A list of user data dictionaries.
        """
        all_users = []
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                # Retrieve all user keys from the set
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    # Fetch each user's data
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    # Convert bytes to string for each field
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}
                    all_users.append(user_data)
        except Exception as error:
            print(f"Error during the select all users operation: {error}")

        return all_users

[MAIN.PY]

from RediRepository import RedisRepository

redisrepo = RedisRepository()

users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)
    # print only name field
    print(f"{user['name']}")

If we run the file main.py, the following will be the result:

Select data in Redis with Python

def select_user_by_name(self, nameinput):
        """
        Finds a user in Redis by their name.
        :param nameinput: The name of the user to find.
        :return: User data if found, else None.
        """
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                # Retrieve all user keys from the set
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    # Fetch each user's data
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    # Convert bytes to string for each field
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}

                    if user_data.get("name") == nameinput:
                        return user_data
        except Exception as error:
            print(f"Error during the select users by name operation: {error}")

        return None

[MAIN.PY]

from RediRepository import RedisRepository

redisrepo = RedisRepository()

users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)

nametofind = 'Name_1'
userstofind = redisrepo.select_user_by_name(nametofind)

if userstofind:
    print(f"User found: {userstofind}")
else:
    print(f"No user found with the name {nametofind}")


nametofind = 'Name_100'
userstofind = redisrepo.select_user_by_name(nametofind)

if userstofind:
    print(f"User found: {userstofind}")
else:
    print(f"No user found with the name {nametofind}")

If we run the file main.py, the following will be the result:

Select data in Redis with Python

def select_all_users_with_id(self):
        """
        Retrieves all users from Redis, including their IDs.
        :return: A list of dictionaries, each containing user data along with the user ID.
        """
        all_users_with_id = []
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    # Extract user ID from the key
                    user_id = user_key.decode('utf-8').split(":")[1]
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}
                    # Include the user ID in the data dictionary
                    user_data['id'] = user_id
                    all_users_with_id.append(user_data)
        except Exception as error:
            print(f"Error during the retrieval operation: {error}")

        return all_users_with_id

[MAIN.PY]

from RediRepository import RedisRepository

redisrepo = RedisRepository()

users_with_ids = redisrepo.select_all_users_with_id()
for user in users_with_ids:
    print(user)

If we run the file main.py, the following will be the result:

Select data in Redis with Python


DELETE USER:

from RediRepository import RedisRepository

redisrepo = RedisRepository()

users_with_ids = redisrepo.select_all_users_with_id()
for user in users_with_ids:
    print(user)
def delete_user_by_name(self, nameinput):
        """
        Deletes a user in Redis by their name.
        :param nameinput: The name of the user to delete.
        :return: True if the user was deleted, False otherwise.
        """
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}
                    if user_data.get("name") == nameinput:
                        # Delete the user's hash
                        client.delete(user_key.decode('utf-8'))
                        # Remove the user key from the set of users
                        client.srem("users", user_key)
                        return True
        except Exception as error:
            print(f"Error during user deletion: {error}")

        return False

[MAIN.PY]

from RediRepository import RedisRepository

redisrepo = RedisRepository()

users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)

nametofind = "Name_1"
if redisrepo.delete_user_by_name(nametofind):
    print(f"User {nametofind} deleted successfully.")
else:
    print(f"User {nametofind} not found or error in deletion.")


nametofind = "Name_100"
if redisrepo.delete_user_by_name(nametofind):
    print(f"User {nametofind} deleted successfully.")
else:
    print(f"User {nametofind} not found or error in deletion.")


users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)

If we run the file main.py, the following will be the result:

Delete data in Redis with Python


UPDATE USER:

def update_user(self, user_id, updated_data):
        """
        Updates a user in Redis.
        :param user_id: The ID of the user to update.
        :param updated_data: A dictionary with the updated user data.
        :return: True if the update was successful, False otherwise.
        """
        user_key = f"user:{user_id}"
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                # Check if the user exists
                if not client.exists(user_key):
                    print(f"No user found with ID: {user_id}")
                    return False

                # Update user data
                client.hset(user_key, mapping=updated_data)
                return True
        except Exception as error:
            print(f"Error during user update: {error}")
            return False

[MAIN.PY]

from RediRepository import RedisRepository

redisrepo = RedisRepository()

# Name_1 -> 0f70825c-be0b-4fe7-b7d6-d3c7e61046dc
users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)

user_to_update = "0f70825c-be0b-4fe7-b7d6-d3c7e61046dc"
new_data = {
    "name": "Name_1_Plus",
    "age": "60",
    "city": "City_1_Plus"
}

if redisrepo.update_user(user_to_update, new_data):
    print(f"User {user_to_update} updated successfully.")
else:
    print(f"Failed to update user {user_to_update}.")

users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)

user_to_update = "AAAAAAAAAA"
new_data = {
    "name": "Name_1_Plus",
    "age": "60",
    "city": "City_1_Plus"
}

if redisrepo.update_user(user_to_update, new_data):
    print(f"User {user_to_update} updated successfully.")
else:
    print(f"Failed to update user {user_to_update}.")


users = redisrepo.select_all_users()
for user in users:
    # print all fields
    print(user)

If we run the file main.py, the following will be the result:

Update data in Redis with Python



Here, the complete version of the RedisRepository file:
[REDISREPOSITORY.PY]

import configparser
import redis
import uuid


class RedisRepository:
    redisclient = None
    server = None
    port = None
    database = None

    # method used to get the parameters to connect to Redis
    def _get_redis_parameters(self):
        # definition of the object used to read the config file
        configfile = configparser.ConfigParser()
        configfile.read("config.ini")

        rediscache = configfile["redis"]
        self.server = rediscache["server"]
        self.port = rediscache["port"]
        self.database = rediscache["db"]

    def __init__(self):
        # get Redis paramters
        self._get_redis_parameters()

    def insert_user(self, name, age, city):
        """
        Creates a new user in Redis with a GUID as the user ID.
        :param name: Name of the user.
        :param age: Age of the user.
        :param city: City of the user.
        :return: The GUID of the created user.
        """
        user_id = str(uuid.uuid4())  # Generate a GUID for the user ID
        user_key = f"user:{user_id}"
        user_data = {
            "name": name,
            "age": age,
            "city": city
        }

        try:
            # Using a context manager to handle the Redis connection
            # This ensures that the connection is properly closed after
            # the block of code is executed, even if an exception occurs:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                #  Hset is a command in Redis that is used to set the value of one or more
                # fields in a hash. It's a very versatile and commonly used command in Redis,
                # especially when dealing with objects or entities that have multiple attributes
                client.hset(user_key, mapping=user_data)
                # Add the user key to the set of users
                client.sadd("users", user_key)
                return user_id
        except Exception as error:
            print(f"Error during the insert operation, because of: {error}")
            return 0

    def select_all_users(self):
        """
        Retrieves all users from Redis.
        :return: A list of user data dictionaries.
        """
        all_users = []
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                # Retrieve all user keys from the set
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    # Fetch each user's data
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    # Convert bytes to string for each field
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}
                    all_users.append(user_data)
        except Exception as error:
            print(f"Error during the select all users operation: {error}")

        return all_users

    def select_user_by_name(self, nameinput):
        """
        Finds a user in Redis by their name.
        :param nameinput: The name of the user to find.
        :return: User data if found, else None.
        """
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                # Retrieve all user keys from the set
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    # Fetch each user's data
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    # Convert bytes to string for each field
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}

                    if user_data.get("name") == nameinput:
                        return user_data
        except Exception as error:
            print(f"Error during the select users by name operation: {error}")

        return None

    def delete_user_by_name(self, nameinput):
        """
        Deletes a user in Redis by their name.
        :param nameinput: The name of the user to delete.
        :return: True if the user was deleted, False otherwise.
        """
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}
                    if user_data.get("name") == nameinput:
                        # Delete the user's hash
                        client.delete(user_key.decode('utf-8'))
                        # Remove the user key from the set of users
                        client.srem("users", user_key)
                        return True
        except Exception as error:
            print(f"Error during user deletion: {error}")

        return False

    def update_user(self, user_id, updated_data):
        """
        Updates a user in Redis.
        :param user_id: The ID of the user to update.
        :param updated_data: A dictionary with the updated user data.
        :return: True if the update was successful, False otherwise.
        """
        user_key = f"user:{user_id}"
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                # Check if the user exists
                if not client.exists(user_key):
                    print(f"No user found with ID: {user_id}")
                    return False

                # Update user data
                client.hset(user_key, mapping=updated_data)
                return True
        except Exception as error:
            print(f"Error during user update: {error}")
            return False

    def select_all_users_with_id(self):
        """
        Retrieves all users from Redis, including their IDs.
        :return: A list of dictionaries, each containing user data along with the user ID.
        """
        all_users_with_id = []
        try:
            with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
                user_keys = client.smembers("users")
                for user_key in user_keys:
                    # Extract user ID from the key
                    user_id = user_key.decode('utf-8').split(":")[1]
                    user_data = client.hgetall(user_key.decode('utf-8'))
                    user_data = {k.decode('utf-8'): v.decode('utf-8')
                                 for k, v in user_data.items()}
                    # Include the user ID in the data dictionary
                    user_data['id'] = user_id
                    all_users_with_id.append(user_data)
        except Exception as error:
            print(f"Error during the retrieval operation: {error}")

        return all_users_with_id



Leave a Reply

Your email address will not be published. Required fields are marked *