01/05/2019

Write a Postgres proxy. Day 3. Return selected rows.

RediSQL, SQL steroids for Redis. Is a very fast in-memory SQL engine. Its main features are:
  1. Speed, up to 130,000 insert per second
  2. Familiarity, it support standard SQL, no weird dialects
  3. Simplicity, it is very easy to operate and to use with binding for any language.
Code on github: RedBeardLab/rediSQL
 

During the last session we worked on extracting the query string from the messages in the PG protocol. Then we focused on INSERT queries allowing to insert data into RediSQL using the PG protocol.

Today we are going to focus on the SELECT queries, indeed we want to read values from RediSQL and return them to the client using the PG protocol.

Fetch all or use cursors?

There are two main alternative when working with SELECT queries in RediSQL.

  1. We can either fetch all the results in one go using commands like REDISQL.EXEC (generic for any kind of query) or REDISQL.QUERY (specific for read only query).
  2. We can stream the result of a selection into a redis stream using REDISQL.QUERY.INTO, and then fetch few rows at the time using the standard stream commands.

While fetching all the results together would be simpler, in this case we opted to use the cursor approach. Indeed, fetching all the results together wouldn’t provide information about the type of each field, information that is fortunately available using the stream approach.

RowDescription message

In order to reply to a SELECT statement with rows from the database the server need to first describe those rows to the client. This operation is done with the RowDescription message.

Honestly this is the most ambigously documented message I have found in the documentation up to now.

Fortunately, it seems like setting all the values to zero is enough for PSQL to accept it as a valid message without complains.

Following the usual convention, the message start with a letter for identification, T in this case, the length of the whole message and then

  1. an integer that specify the number of fields that this message is describing, and
  2. the description of all those fields.

On the description of the fields I am unsure about several properties, however setting them all to zero seems to be ok. The only properties that we didn’t set to zero are the one concerning the data size, set to 8 for integers and to something negative (variable) for strings, and the one regarding the format of the field, that we set to a textual representation.

The actual implementation is here.

def RowDescription(rows):
    body = bytes(0)
    for rowType, rowName in rows:
        fieldName = bytes(rowName, "utf-8") + b'\x00'
        tableId = bytes(4)
        columnId = bytes(2)
        dataTypeId = bytes(4)
        if rowType == "int":
            dataSize = bytes([0, 8])
        if rowType == "string":
            dataSize = bytes([255, 255])
        typeModifier = bytes(4)
        formatCode = (0).to_bytes(2, byteorder="big")
        body += fieldName + tableId + columnId + dataTypeId + dataSize + typeModifier + formatCode
    totalLen = len(body) + 4 + 2
    totalLenBytes = totalLen.to_bytes(4, byteorder="big")
    totalFieldsBytes = len(rows).to_bytes(2, byteorder="big")
    return bytes([ord('T')]) + totalLenBytes + totalFieldsBytes + body

DataRow message

After we describe what would be the shape of each row in the result set, the next obvious step is to send all the rows.

This is done using the DataRow message.

def DataRow(row):
    body = bytes(0)
    for fieldType, fieldValue in row.items():
        typeField, nameField = fieldType.decode("utf-8").split(":")
        if typeField == "int":
            value = fieldValue.decode()
            lenght = (len(value) + 1).to_bytes(4, byteorder="big")
            valueBytes = bytes(value, "utf-8") + b'\x00'
            body += lenght + valueBytes
    totalLen = len(body) + 4 + 2
    totalLenBytes = totalLen.to_bytes(4, byteorder="big")
    totalFieldsBytes = len(row).to_bytes(2, byteorder="big")
    return bytes([ord('D')]) + totalLenBytes + totalFieldsBytes + body

The convention is always the same, a starting byte D in this case, to indicate the type of message, the total length of the message, and then how many fields are in this row, followed by the body of the message itself, the description of the row itself.

As you can see from the code, we support only integer at the moment. The integer is decoded and re-encoded as UTF-8 and then appended to the whole body

Wrapping all together

The final part of today is about putting all together.

The first step after identifying the SELECT query is to execute them into a stream, we randomly generate one stream name and we pass the query to REDISQL.QUERY.INTO.

elif firstToken.upper() == "SELECT":
      stream = random_stream()
      result = self.redis.execute_command("REDISQL.QUERY.INTO", stream, self.db, query)

Then we analyze the first returned row in order to construct the RowDescriptionmessage.

streamResult = self.redis.execute_command("XREAD",  "COUNT", "1", "STREAMS", stream, "0")
firstRow = streamResult[0][1][0][1]
rows = []
for key, _ in firstRow.items():
    rowType, rowName = key.decode("utf-8").split(':')
    rowType, rowName = rowType.strip(), rowName.strip()
    rows.append((rowType, rowName,))
self.transport.write(RowDescription(rows))

The row in the stream contains both the type of the column and the name of the column, separated by a colon (:), then the actual value. Something like the following.

1) 1) "1549811093979-0"    
2) 1) "int:a"
2) "1"
3) "null:b"
4) "(null)"
2) 1) "1549811093979-1"
2) 1) "int:a"
2) "3"
3) "int:b"
4) "4"

The odd elements are the type and the name of the column int:a(integer in column a), while the even one are the actual value of the field.

Now that we know hot to read the data from the stream, the last step is to actually return the rows.

We iterate the stream and from each row we create and send a DataRow message.

returnedRows = self.redis.xread({stream: "0"})[0][1]
for _, row in returnedRows:
    self.transport.write(DataRow(row))
self.transport.write(CommandComplete("SELECT"))

Finally, we complete the workflow sending the classical ReadyForQUerymessage.

Concluding

In this section we analyze the workflow to answer to SELECT queries.

We start by getting the first row and using it to build the RowDescription message, then we iterate each row in the result set and we build and send a DataRow message.

If you like this article, make sure to follow me on twitter and to subscribe to the mail list.

All post of this serie: Writing a Postgres proxy.


Newsletter

We publish new content each week, subscribe to don't miss any article.

Leave a Reply

Your email address will not be published. Required fields are marked *