Creating an Apfell - Part 4
Published:
At the end of the last post we were starting to make some progress. We had a very basic RESTful interface that can communicate with a postgres database and a basic templeting engine to create a graphical user interface. To do this, we leveraged Jinja2 and the twitter bootstrap for making our user interface. If we go back to our original diagram, we’re only missing a few more pieces.
In this next post, we’re going to talk a bit about a cool feature in the latest postgres database, LISTEN/NOTIFY commands, and how to leverage this into a little more dynamic of an environment with websockets.
LISTEN / NOTIFY
The common paradigm is called “publish-subscribe” where multiple workers “subscribe” to a single point that “publishes” data. Everything that’s subscribed to a certain event, gets the data. Postgres calls these functions “Notify” and “Listen” respectively. It’s easiest if we walk through an example:
def pg_register_newcallback():
create_function_on_callback_changes = """
DROP FUNCTION IF EXISTS notify_newcallback() cascade;
CREATE FUNCTION notify_newcallback() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN PERFORM pg_notify('newcallback', row_to_json(NEW)::text); RETURN NULL; END; $$;
"""
create_trigger_on_callback_changes = """
CREATE TRIGGER newcallback_trigger AFTER INSERT ON callback FOR EACH ROW EXECUTE PROCEDURE notify_newcallback();
"""
try:
apfell_db.execute_sql(create_function_on_callback_changes)
except Exception as e:
print(e)
try:
apfell_db.execute_sql(create_trigger_on_callback_changes)
except Exception as e:
print(e)
I put this function in my models.py
file. Then call it right under where you do Callback.create_table(True)
. There code within the python strings are commands you can run right on the command line within postgres. If you want to do this route to test this, simply switch to the postgres user sudo su - postgres
, drop into the interactive mode psql
, and select our database \c apfell_db
. Now you can run these commands. In Postgres, you can create the concept of “FUNCTION”s and “TRIGGER”s, among other things. Let’s cover the trigger part first.
CREATE TRIGGER newcallback_trigger AFTER INSERT ON callback FOR EACH ROW EXECUTE PROCEDURE notify_newcallback();
This creates a trigger called newcallback_trigger
. This is pretty straight forward, but remember that a single INSERT operation can insert multiple rows. After a successful INSERT into the table callback
, for each row that was added, execute a procedure called notify_newcallback();
. Pretty straightforward. Now what about the first command?
CREATE FUNCTION notify_newcallback() RETURNS trigger LANGUAGE plpgsql as $$ BEGIN PERFORM pg_notify('newcallback', row_to_json(NEW)::text); RETURN NULL; END; $$;
This one is a little more complicated. First, it creates the function (procedure) we referenced in the previous command, notify_newcallback()
. Now, we expect this function to be called by our trigger. According to postgres, this means our function takes on some special properties.
The function must take no parameters and it must have a return type of trigger
. Our language is plpgsql
and we call the pg_notify
command. This is a special function in Postgres to do NOTIFY programatically.
As part of our special function, we get some special variables. In our case (inserting a new entry into the table), we get a variable called “NEW” which contains the data that was inserted. This blog also has a great walkthrough for doing something similar (and closer to where we’re going in the future). Depending on if you’re inserting, updating, or deleting from a table, the NEW and OLD parameters will contain different information.
pg_notify
’s first parameter is the keyword that’ll be used in the LISTEN command. The second parameter is what we’re returning. In our case, we want the contents of the new row to be formatted in JSON, so we use another built-in function called row_to_json
. The rest of the delaration is simply structuring the opening and closing blocks that normal functions have.
One way to test this out is to open up two terminal windows. Log into both and connect to the database in both of them. In window1, create the function and trigger. In window2, execute LISTEN newcallback;
. In window1 now insert a new callback. As a side effect of doing it this way, in window2 do any sort of query like SELECT 1;
and you’ll see the notification that there was an insert. What if we want to test this out in a little more of an operational way? More exciting right? Let’s do it!
websockets
Websockets allow you to open up an interactive, event-driven, bi-directional communication stream without having to continuously poll for information. From the Sanic perspective, there is a decorator and function definition specifically for this:
@apfell.websocket('/ws/callbacks')
async def ws_callbacks(request, ws):
pass
This new parameter, ws
, allows us to do the interactive portion of websockets. We simply do ws.send("data")
to send data from Sanic, through the websocket, to the other end (which we haven’t covered yet). This next part of this for Sanic is a little away from web sockets, but required to make the proper connections back to the database. This is where we’ll be registering our listener in the postgres LISTEN/NOTIFY paradigm.
@apfell.websocket('/ws/callbacks')
async def ws_callbacks(request, ws):
async with aiopg.create_pool('dbname=apfell_db user=postgres password=postgres') as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('LISTEN "newcallback";')
# before we start getting new things, update with all of the old data
callbacks = Callback.select()
operators = Operator.select()
callbacks_with_operators = await db_objects.prefetch(callbacks, operators)
for cb in callbacks_with_operators:
await ws.send(js.dumps(cb.to_json()))
# now pull off any new callbacks we got queued up while processing the old data
while True:
msg = await conn.notifies.get()
await ws.send(msg.payload)
Ok, a lot going on here. I know. First, let’s talk about what we’re trying to do at a high level:
- Connect to the database
- Start Listening for new notifications
- Get all of our old callback information
- send all of the old information to the other end of the websocket
- catch any notification from the database and send it off to the othe end of the websocket
We use aiopg
to create another pool for doing asynchronous database connections. I wasn’t able to get the right level of access with the new peewee-async Manager interface they provide (db_objects in our case), but I can still get it asychronously with aiopg so it’s ok for now. The cur.execute('LISTEN "newcallback";')
line is what executes a postgres comment to register us to get notifications. I do this before I query the database for the current callback information to hopefully prevent us missing a callback. If you remember back when we did the peewee section in Post 2 where we defined the table layout with our ORM, we made a foreign key in callback
for an operator
field. This was to allow us to tie a specific callback’s creation to a specific operator for accounting and logging purposes. We won’t always know this, but sometimes we will. Now, if you just do a standard query for all Callbacks, you’ll get an error (because we error on attempted synchronous database queries). Wait, where is any of this synchronous? It’s under the hood unfortunately, but when you query for all callbacks normally, there will be a subsequent, synchronous call to get the operator information. That’s the issue. Luckily, peewee-async has a way to fix this called prefetch. We give the Manager more information about what will be needed in the upcoming query so it can figure out how to make them asynchronous calls. When this returns, we simply send off all of the data to the other end of the websocket. After we’ve sent all of that information forward, we enter an infinite loop just getting new notifications and sending them off. Because we’re doing this asynchronously, we have no issues and can easily process other work while we wait for the conn.notifies.get()
call to unblock and give us data.
Ok, that was all on the internal Sanic side. But, something has to actually call our /ws/callbacks
path right? That’s going to be the front end JavaScript. Remember how we had a spot in our Jinja2 template for script blocks? Now’s the time!
function startwebsocket(){
var ws = new WebSocket('ws://127.0.0.1/ws/callbacks');
ws.onmessage = function(event){
cb = JSON.parse(event.data);
console.log(cb);
};
ws.onclose = function(){
console.log("socket closed");
}
ws.onerror = function(){
console.log("websocket error");
}
ws.onopen = function(event){
console.debug("opened");
}
};
startwebsocket();
So, this isn’t too crazy right? Overall, create one function and call it. Inside the function, we create a new websocket and connect it to the route we created in Sanic. I’m doing this all locally, so I reach back to localhost for this, but ideally you’d be hosting this somewhere where you could have a static IP or domain name you’d go to. The main part we’re concerned about is ws.onmessage
. This function is triggered automatically when we say ws.send("data")
from the web server side (Sanic). We are doing everything in JSON, so we know that the event.data
that we sent will be JSON. To easily handle this in JavaScript, we just use the JSON.parse
functionality. From here we simply log it to the console.
Now, there is something to consider. In our python Model for a callback we had some timestamps. These were like:
init_callback = p.DateTimeField(default=datetime.datetime.now, null=False)
last_checkin = p.DateTimeField(default=datetime.datetime.now, null=False)
This datetime stuff gets passed throughout, but isn’t actually valid JSON, it’s just python-isms. We can fix this though! If you remember look back at our initial ORM, you’ll notice a few interesting lines in our to_json
function:
r['init_callback'] = r['init_callback'].strftime('%m/%d/%Y %H:%M:%S')
r['last_checkin'] = r['last_checkin'].strftime('%m/%d/%Y %H:%M:%S')
We are converting to a new format when we’re converting to an actual JSON representation. This is required for it to be correctly parsed on the JavaScript side. Just keep in mind as you go through this sort of stuff that there tends to be a bunch of little discrepencies between languages.
You should be good to go from here though. When you start up your server.py
, your pg_register_newcallback()
function will create the appropriate FUNCTION and TRIGGER on your database. When you browse to the web page where you put in the above JavaScript, it will automatically connect to your Sanic websocket route and register a LISTENer. Now, if you insert a new row into your callbacks table (either straight from the database or using your nifty RESTful APIs), you’ll get a message in your console in your browser with that information right away. You’ve now got an asynchronous mechanism to go from any insert into a database all the way through real-time processing in a browser! Cool right! You also have a database and python representation of your data, a RESTful API, and a dynamic templeting engine to display pretty web pages to users!
This means that there are only two main pieces left for us to cover in the next post: Vue and sanic-auth. But we’re almost there!