Python – Management of a MongoDB database

By | 06/12/2023

In this post, we will see how to manage a MongoDB database with Python using pymongo.
In detail, we will perform the CRUD operations on a document called ‘User’ defined as follows:

{
  "name": string, 
  "age": int, 
  "city": string
}


We start writing a Docker-compose file to create a docker container, with an instance of MongoDB:

version: '3'
services:
  # MongoDB container definition
  dockermongo:
    # name docker image 
    image: mongo
    # username and password Admin definition
    environment:
      - MONGO_INITDB_ROOT_USERNAME=admindb 
      - MONGO_INITDB_ROOT_PASSWORD=pass123 
    # volume definition
    volumes:
      - dbmongo:/data/db
    ports:
      - 27017:27017
volumes:
  dbmongo:
    driver: local


If we run it, the following will be the result:

Finally, with the command docker ps, we can check if the container is running:


Now, we will install pymongo library:

py -m pip install pymongo


We have installed all components and now, we can start to write the Python code.

First of all, we create a config file where we will insert the MongoDB parameters:
[CONFIG.INI]

[mongodb]
server = localhost
user = admindb
password = pass123
database = dbuser
port = 27017


Then, we define a Class called ‘User’ that represents the document in MongoDB:
[USER.PY]

class User:
    def __init__(self, nameinput, ageinput, cityinput):
        self.name = nameinput
        self.age = ageinput
        self.city = cityinput


Finally, we create a file called ‘MongoRepository’ that we will use to manage all CRUD operations:
[MONGOREPOSITORY.PY]

import configparser
from pymongo import MongoClient
from User import User


class MongoRepository:
    connectionstring = None
    dbname = None
    client = None
    db = None
    collectionusers = None

    # method used to get the connection string to MongoDB
    # and to take the Db name
    def _getconnection(self):
        # definition of the object used to read the config file
        configfile = configparser.ConfigParser()
        configfile.read("config.ini")

        mongodb = configfile["mongodb"]
        server = mongodb["server"]
        user = mongodb["user"]
        password = mongodb["password"]
        database = mongodb["database"]
        port = mongodb["port"]

        self.connectionstring = f"mongodb://{user}:{password}@{server}:{port}"
        self.dbname = database

    def __init__(self):
        # get connection string
        self._getconnection()
        # definition of client, db and collection
        self.client = MongoClient(self.connectionstring)
        self.db = self.client[self.dbname]
        # get the collection called 'users'
        self.collectionusers = self.db['users']



INSERT NEW USERS:

    def saveusers(self, lstuser):
        self.collectionusers.insert_many(lstuser)

[MAIN.PY]

from MongoRepository import MongoRepository
import random

# creating instance of MongoRepository
repo = MongoRepository()

# Preparing 100 documents to insert
lstusers = [
    {
        "name": f"User{i}",
        "age": random.randint(18, 60),  # Random age between 18 and 60
        # Random city
        "city": random.choice(["London", "Rome", "Paris", "Monaco", "Amsterdam"])
    }
    for i in range(100)
]

try:
    # save list of users
    repo.saveusers(lstusers)
    print("All data added in MongoDB")
except Exception as error:
    print("Something went wrong")
    print(error)

If we run the file main.py, the following will be the result:


def saveuser(self, user):
        newuser = {"name": user.name, "age": user.age, "city": user.city}
        self.collectionusers.insert_one(newuser)

[MAIN.PY]

from MongoRepository import MongoRepository
from User import User
import random

# creating instance of MongoRepository
repo = MongoRepository()

user = User("TestName", 44, "London")
user2 = User("TestName2", 56, "Rome")
user3 = User("TestName3", 23, "Amsterdam")

repo.saveuser(user)
repo.saveuser(user2)
repo.saveuser(user3)

repo.selectall()

If we run the file main.py, the following will be the result:



SELECT USERS:

  def selectall(self):
        for user in self.collectionusers.find().sort('name', ASCENDING):
            print(
                f"{user.get('name', 'N/A')} - {user.get('age', 'N/A')} - {user.get('city', 'N/A')}")

[MAIN.PY]

from MongoRepository import MongoRepository
import random

# creating instance of MongoRepository
repo = MongoRepository()

repo.selectall()

If we run the file main.py, the following will be the result:


def selectforagegreater50(self):
        for user in self.collectionusers.find({"age": {"$gt": 50}}).sort('name', ASCENDING):
            print(
                f"{user.get('name', 'N/A')} - {user.get('age', 'N/A')} - {user.get('city', 'N/A')}")

[MAIN.PY]

from MongoRepository import MongoRepository
import random

# creating instance of MongoRepository
repo = MongoRepository()

repo.selectforagegreater50()

If we run the file main.py, the following will be the result:



DELETE USERS:

def deleteallusers(self):
        self.collectionusers.delete_many({})

[MAIN.PY]

from MongoRepository import MongoRepository
import random

# creating instance of MongoRepository
repo = MongoRepository()

repo.deleteallusers()

repo.selectall()

If we run the file main.py, the following will be the result:


def deleteuserswithagelessvalueinput(self, age):
        self.collectionusers.delete_many({"age": {"$lt": age}})

[MAIN.PY]

from MongoRepository import MongoRepository
from User import User
import random

# creating instance of MongoRepository
repo = MongoRepository()

repo.selectall()
print("\n\n")

repo.deleteuserswithagelessvalueinput(40)

repo.selectall()

If we run the file main.py, the following will be the result:



UPDATE USER:

    def updateuser(self, user):
        self.collectionusers.update_one({"name": user.name}, {
            "$set": {"age": user.age}})

[MAIN.PY]

from MongoRepository import MongoRepository
from User import User
import random

# creating instance of MongoRepository
repo = MongoRepository()

repo.selectforagegreater50()
print("\n\n")

user = User("TestName2", 18, "Rome")
repo.updateuser(user)
repo.selectall()

If we run the file main.py, the following will be the result:



Leave a Reply

Your email address will not be published. Required fields are marked *