In this article, we are going to build a REST API to perform CRUD operations.
To build this server we will use Robyn and the Postgres driver Psycopg2.
Robyn
Robyn is a fast async Python web framework coupled with a web server written in Rust.
Psycopg2
Psycopg is the most popular PostgreSQL database adapter for the Python programming language. Its main features are the complete implementation of the Python DB API 2.0 specification and the thread safety (several threads can share the same connection). It was designed for heavily multi-threaded applications that create and destroy lots of cursors and make a large number of concurrent INSERTs or UPDATEs.
Requirements
Python knowledge
Basic SQL knowledge
Installation
pip install robyn psycopg2-binary python-dotenv
Project Structure
robyn-postgres-demo/
app.py
controllers.py
helpers.py
init_db.py
Creating a table
We have to create a database.
On our command line, we run the following command:
CREATE DATABASE flask_db;
init_db.py
import os
load_dotenv()
USER = os.getenv('USERNAME')
PASSWORD = os.getenv('PASSWORD')
conn = psycopg2.connect(
dbname = "robyn_db",
user = USER,
password = PASSWORD
)
cur = conn.cursor()
cur.execute('DROP TABLE IF EXISTS books;')
cur.execute('CREATE TABLE books (id serial PRIMARY KEY,'
'title varchar (150) NOT NULL,'
'author varchar (50) NOT NULL,'
'date_added date DEFAULT CURRENT_TIMESTAMP);'
)
Inside the init_db.py
file, we load our environment variables to get access to Postgres. Then, we initialize a cursor to perform database operations. And create a table named books
.
Inserting data into the table
init_db.py
cur.execute('INSERT INTO books (title, author)'
'VALUES (%s, %s)',
('A Tale of Two Cities',
'Charles Dickens')
)
cur.execute('INSERT INTO books (title, author)'
'VALUES (%s, %s)',
('Anna Karenina',
'Leo Tolstoy')
)
conn.commit()
cur.close()
conn.close()
The code above inserts data every time we start the server.
controllers.py
from init_db import get_db_connection
def all_books():
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM books;')
books = cur.fetchall()
cur.close()
conn.close()
return books
This function retrieves all the rows in the database.
app.py
We create a new file app.py
to write our endpoints. We will start writing an endpoint to retrieve all the rows in the database.
from robyn import Robyn
from controllers import all_books
app = Robyn( __file__ )
@app.get("/books")
async def books():
books = all_books()
return {"status_code":200, "body": books, "type": "json"}
app.start(port=8000, url="0.0.0.0")
The all_books()
function retrieves all the rows in the database. But, it returns them as a list of tuples. We need the function to return a list of JSON.
[(1, 'A Tale of Two Cities', 'Charles Dickens', datetime.date(2023, 2, 22)), (2, 'Anna Karenina', 'Leo Tolstoy', datetime.date(2023, 2, 22))]
We have to create a file with helpers, to transform data into dictionaries so the endpoints can return data as JSON.
helpers.py
def to_dict(psycopg_tuple:tuple):
book_dict = collections.OrderedDict()
book_dict['id'] = psycopg_tuple[0]
book_dict['title'] = psycopg_tuple[1]
book_dict['author'] = psycopg_tuple[2]
book_dict['datetime'] = psycopg_tuple[3].strftime("%m/%d/%Y")
return book_dict
def list_dict(rows:list):
row_list = []
for row in rows:
book_dict = to_dict(row)
row_list.append(book_dict)
return row_list
The to_dict()
function has a tuple
as a parameter. And transforms it into an ordered dictionary, this way the position of the key-value pairs will not change.
The list_dict()
function has a list
as a parameter. We use it to convert a list of tuples to a list of dictionaries.
Controllers
In controllers.py
we are going to write all the functions to perform CRUD operations.
All the records
def all_books():
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM books;')
books = list_dict(cur.fetchall())
cur.close()
conn.close()
return books
The all_books()
function retrieves all the records in the database.
Creating a record
def new_book(title:str, author:str):
conn = get_db_connection()
cur = conn.cursor()
cur.execute('INSERT INTO books (title, author)'
'VALUES (%s, %s) RETURNING *;',
(title, author))
book = cur.fetchone()[:]
book_dict = to_dict(book)
conn.commit()
cur.close()
conn.close()
return json.dumps(book_dict)
The new_book()
function has title
and author
as parameters and insert the values into the database. Then retrieves the last row added, convert it to a dictionary and returns it as JSON.
Retrieving by ID
def book_by_id(id:int):
conn = get_db_connection()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM books WHERE id=%s', (id))
book = cur.fetchone()
book_dict = to_dict(book)
cur.close()
conn.close()
return json.dumps(book_dict)
except:
return None
book_by_id()
function has id
as a parameter. With this function, we retrieve a row by its ID and return it as JSON. If there is no row with the ID passed, the function returns None
.
Updating a record
def update_book(title:str, author, pages_num, review, id:int):
conn = get_db_connection()
cur = conn.cursor()
cur.execute('UPDATE books SET title = %s, author=%s WHERE id = %s RETURNING *;', (title, author, id))
book = cur.fetchone()[:]
book_dict = to_dict(book)
conn.commit()
cur.close()
conn.close()
return json.dumps(book_dict)
We use update_book()
controller to update the values of a row. The function returns JSON with the row updated.
Deleting a record
def delete_book(id:int):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM books WHERE id = %s", (id))
conn.commit()
conn.close()
return "Book deleted"
We pass an ID of a row to delete_book
, to delete the row.
Endpoints
On app.py
file we import all the functions from controllers.py
. And write all the endpoints.
POST endpoints
from robyn import Robyn
from controllers import all_books, new_book, book_by_id, delete_book, update_book
import json
app = Robyn( __file__ )
@app.post("/book")
async def create_book(request):
body = bytearray(request['body']).decode("utf-8")
json_body = json.loads(body)
try:
book = new_book(json_body['title'], json_body['author'])
return {"status_code":201, "body": book, "type": "json"}
except:
return {"status_code":500, "body": "Internal Server Error", "type": "text"}
app.start(port=8000, url="0.0.0.0")
This endpoint creates a new row. The function extracts the body from the request and converts it to JSON. Then, It passes the values to new_book()
function and returns 201 as the status code, and the row that was created as the body.
GET endpoints
@app.get("/books")
async def books():
books = all_books()
return books
This endpoint executes all_books()
function to return all the rows in the database.
@app.get("/book/:id")
async def get_book(request):
id = request['params']['id']
book = book_by_id(id)
try:
if book == None:
return {"status_code":404, "body": "Book not Found", "type": "text"}
else:
return {"status_code":200, "body": book, "type": "json"}
except:
return {"status_code":500, "body": "Internal Server Error", "type": "text"}
This endpoint extracts the ID parameter from the request and passes it to the book_by_id()
function if the search succeeds, it returns a status code 200 and the row as the body. If no book matches the ID passed, it will return a status code 404
and the message "Book not found" as a response.
PUT endpoint
@app.put("/book/:id")
async def update(request):
id = request['params']['id']
body = bytearray(request['body']).decode("utf-8")
json_body = json.loads(body)
title = json_body['title']
author = json_body['author']
book_id = book_by_id(id)
if book_id == None:
return {"status_code":404, "body": "Book not Found", "type": "text"}
else:
try:
book = update_book(title, author, id)
return {"status_code":200, "body": book, "type": "json"}
except:
return {"status_code":500, "body": "Internal Server Error", "type": "text"}
First, this function searches that the ID passed matches one in the database. Then, it proceeds to pass the values to the update_book()
function and update the row. If the operation succeeds, it will return a status code 200
and the updated row as a response.
DELETE endpoint
@app.delete("/book/:id")
async def delete(request):
id = request['params']['id']
book_id = book_by_id(id)
if book_id == None:
return {"status_code":404, "body": "Book not Found", "type": "text"}
else:
try:
delete_book(id)
return {"status_code":200, "body": "Book deleted", "type": "json"}
except:
return {"status_code":500, "body": "Internal Server Error", "type": "text"}
app.start(port=8000, url="0.0.0.0")
In this endpoint, the function extracts the ID parameter, then it looks for the row in the database. Then passes the ID to the delete_book()
function to delete the row.
Conclusion
This is the first time using the Psycopg2 driver. It was easy to use thanks to its documentation.
I was interested to create an app with Robyn and a database. And I found using psycopg2 was an easy option to integrate with Robyn.
I hope this article helps more developers to try Robyn and create apps with it.
Thank you for taking the time to read this article.
If you have any recommendations about other packages, architectures, how to improve my code, my English, or anything; please leave a comment or contact me through Twitter, or LinkedIn.
The source code is here.
Top comments (0)