Creating an Apfell - Part 2

7 minute read

Published:

At the end of the last post we ended up with a very basic RESTful interface to work with some ‘operators’ and ‘callbacks’ by leveraging Python 3’s asyncio (async and await) and the Sanic project. From the first post, we said there are three main parts for this section of our project:

  • Database
  • Web Server (Sanic)
  • User Interface

The next steps we’ll discuss are the database and the webserver-database boundary. For our purposes, I’ll be using Postgres for the database because it has a nice feature as of version 9 that I want to leverage. I’ll cover this feature later in this post though.

Peewee - ORM

We are writing our program in Python, and I would like to keep it that way. That means avoiding writing SQL commands as much as possible. One way to do this is to leverage an ORM (Object Relational Model) that works as an intermediary between Python objects and SQL queries/objects. I used peewee as my ORM so that I can leverage peewee-async to actually connect to my database. Let’s see how this could work:

import peewee as p
import datetime

class Operator(p.Model):
  username = p.CharField(max_length=64, unique=True, null=False)
  password = p.CharField(max_length=1024, null=False)
  
class Callback(p.Model):
  init_callback = p.DateTimeField(default=datetime.datetime.now, null=False)
  last_checkin = p.DateTimeField(default=datetime.datetime.now, null=False)
  user = p.CharField(max_length=64, null=False)
  host = p.CharField(max_length=255, null=False)
  pid = p.IntegerField(null=False)
  ip = p.CharField(max_length=100, null=False)
  description = p.CharField(max_length=1024, null=True)
  operator = p.ForeignKeyField(Operator, null=True)

Operator.create_table(True)
Callback.create_table(True)

There are some important things to note here. Each class represents a different table in our database, and each member of that class represents a column in that table. We specify unique=True to indicate that column must have all unique values. Similarly, null=False means that column cannot be null in a row. I put this code in models.py within the database_models folder to keep everything organized. You can see here that I have a ForeignKeyField in Callback for Operator. This allows us to have a one-to-many relationship where an operator can have many callbacks associated with them. This just describes the database, but we still need to actually connect to it.

Peewee-async

peewee-async allows us to connect to our database while keeping our program asynchronous. In __init__.py, add the following:

import uvloop
from peewee_async import Manager, PooledPostgresqlDatabase

dbloop = uvloop.new_event_loop()
apfell_db = PooledPostgresqlDatabase('apfell_db', user='postgres', password='postgres')
apfell_db.connect_async(loop=dbloop)
db_objects = Manager(apfell_db, loop=dbloop)

and server.py becomes:

from app import apfell, dbloop, apfell_db, db_objects
import asyncio

if __name__ == "__main__":
  asyncio.set_event_loop(dbloop)
  server = apfell.create_server(host='0.0.0.0', port=80)
  loop = asyncio.get_event_loop()
  task = asyncio.ensure_future(server)
  db_objects.database.allow_sync = False  # raise AssertionError on ANY sync call
  try:
      loop.run_until_complete(apfell_db.connect_async(loop=dbloop))
      loop.run_forever()
  except:
      loop.stop()

Ok, that was a lot of confusing code. Let’s step through it a little. Asynchronous python relies on event loops. This is what allows Python to act like it has many different threads running within a single process without actually having to spin up other threads and all the headaches that come with threading. This also relies on installing postgresql on your linux machine, with a database called apfell_db with username postgres and password postgres . You should definitely change these for your installation, but they’re the defaults here. Manager is a peewee-async mechanism specifically for handling database IO in an asynchronous fashion while still allowing the normal peewee ORM to work underneath. I also set db_objects.database.allow_sync = False while I’m doing development so that I know exactly when I accidentally try to make a synchronous connection to the database (note This will cause the program to crash). We get our current event loop with asyncio.get_event_loop(), add our server as a task in that loop asyncio.ensure_future(server), make sure we successfully connect to the database loop.run_until_complete(apfell_db.connect_async(loop=dbloop)), and finally start running our server forever loop.run_forever().

This combined with our previous section will, upon successfully connecting to the database, try to make two tables with columns defined as we specified. The (True) part of those create_table calls will cause silent fails if the tables already exist. It’s important to note that right now I have no migration capabilities. So, if you change the structure of your database, you will need to drop the modified tables before running your program again.

Manual database connection

If you need to manually connect to your database, you can do so with psql from the command line. You can then connect to our database with \c apfell_db. \dt will allow you to see the tables within the database. \d+ operator will allow you to see all the columns and schema information for the operator table. If you want to input information manually, you can do the following: insert into operator (username, password) values ('user1', 'pass1');. If you need to drop a table, simply run drop table callback cascade;.

Peewee* in action

Now lets actually try this out. Manually insert some operators into the operator table. Let’s update our API to get these operators instead of the ones we hard-coded into our server. Modify Server/app/routes/routes.py:

from app import apfell, db_objects
from app.database_models.model import Operator

@apfell.route("/api/v1.0/operators/", methods=['GET'])
async def get_all_operators(request):
  ops = await db_objects.execute(Operator.select())
  return json([p.to_json() for p in ops])

@apfell.route("/api/v1.0/operators/<id:int>", methods=['GET'])
async def get_one_operator(request, id):
  try:
      op = await db_objects.get(Operator, id=id)
      return json(str(op))
  except:
      print("Failed in /api/v1.0/operators/id for a GET request")
      return abort(404)

This uses the db_objects Manager to make an asynchronous request to the postgresql server for us. This is why you also see an await before the call. The Operator.select() call allows us to select all operators where as the db_objects.get(Operator, id=id) call allows us to get a single specific operator. If you try to get a specific operator and that operator doesn’t exist, peewee-async will throw an exception - this is why we surround it with a try/except block. You’ll also notice that we are trying the results of these database calls as if they were just python objects. This makes everything pretty seamless from our end. There’s just one thing though - how do you print the string form or json form of an arbitrary database object? No idea. So, we’ll have to specify that ourselves. Time to update the model!

class Operator(p.Model):
  username = p.CharField(max_length=64, unique=True, null=False)
  password = p.CharField(max_length=1024, null=False)

  class Meta():
      database = apfell_db

  def to_json(self):
      r = {}
      for k in self._data.keys():
          try:
              r[k] = getattr(self, k)
          except:
              r[k] = json.dumps(getattr(self, k))
      return r

  def __str__(self):
      return str(self.to_json())

class Callback(p.Model):
  init_callback = p.DateTimeField(default=datetime.datetime.now, null=False)
  last_checkin = p.DateTimeField(default=datetime.datetime.now, null=False)
  user = p.CharField(max_length=64, null=False)
  host = p.CharField(max_length=255, null=False)
  pid = p.IntegerField(null=False)
  ip = p.CharField(max_length=100, null=False)
  description = p.CharField(max_length=1024, null=True)
  operator = p.ForeignKeyField(Operator, null=True)

  class Meta():
      database = apfell_db

  def to_json(self):
      r = {}
      for k in self._data.keys():
          try:
              if k == 'operator':
                  r[k] = (getattr(self, k)).to_json()
              else:
                  r[k] = getattr(self, k)
          except:
              r[k] = json.dumps(getattr(self, k))
      r['init_callback'] = r['init_callback'].strftime('%m/%d/%Y:%H:%M:%S')
      r['last_checkin'] = r['last_checkin'].strftime('%m/%d/%Y:%H:%M:%S')
      return r

  def __str__(self):
      return str(self.to_json())

The __str__(self) function is called when you try to turn either Operator or Callback into a string. You can format this to include whatever you want. Similarly, we define a to_json function which we call in our api method so that we can continue our RESTful API. Additionally, the inner class Meta() and corresponding database = apfell_db makes sure that these models are associated with our apfell_db database.

To update an object in the database, do something like:

op = await db_objects.get(Operator, id=id)
op.username = "new_hax0r"
await db_objects.update(op)

And to create an object, do something like:

op = await db_objects.create(Operator, username="username_here", password="pass1234@")

Lastly, to delete an object:

op = await db_objects.get(Operator, id=id)
await db_objects.delete(op)

I’ll leave it as an exercise to finish this up for Operator and to implement the corresponding part for Callback.