Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions binder/inter_process_producers_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import os
import socket
import time

# This file provides functions to create an inter-process producers-consumer system. The basic
# concept is that the *produce* method can be called to ensure that a given 'payload' is
# 'consumed' in another process: the consumer process. If the consumer process is not yet
# running, it will also be started by the *produce* method. After consuming the message, it
# will wait for payloads of future producers and it will quit when it hasn't received anything
# for a certain amount of time.
#
# This system is used to maintain our RabbitMQ connection: the consumer will create a RabbitMQ
# connection and publish all payloads it gets from the producers to RabbitMQ. This avoids the
# need to create and destroy a RabbitMQ connection for each payload.


# The producers and the consumer will run on the same machine, so we should use localhost
"""
The host at which the consumer will listen for incoming producer connections
"""
HOST = '127.0.0.1'

# We might want to stop hardcoding the port at some point and use some environment variable instead,
# but this is not as easy as it looks: the consumer will run in a different process and thus won't
# be able to read the environment variables, so it will have to be propgated some other way.
"""
The port at which the consumer will listen for incoming producer connections
"""
PORT = 22102

"""
The consumer will stop when it hasn't received anything for *MAX_CONSUMER_WAIT_TIME* seconds long
"""
MAX_CONSUMER_WAIT_TIME = 5

"""
Retries the given *task* *num_attempts* times until it succeeds (returns True).

After each failed attempt, it will wait between *min_wait_time* and *max_wait_time* seconds
before doing the next attempt. The time between the first attempt and the second attempt is
the shortest, and the time between consecutive attempts will take longer and longer (but never
longer than *max_wait_time*).

This behavior is nice because it ensures that the first failure is retried quickly while decreasing
the system load when retrying many times.
"""
def _retry(task, min_wait_time, max_wait_time, num_attempts):
for attempt_counter in range(num_attempts):
if task():
return
else:
time.sleep(min_wait_time + (max_wait_time - min_wait_time) * (attempt_counter / num_attempts))
raise RuntimeError('Reached maximum number of retries')

"""
The number of characters (well... bytes) used to encode the payload length of the producers.

The producer will write the payload length in the first 8 bytes and write the actual payload
thereafter. This is needed to let the consumer know how long each payload is.

The value is currently 8 characters long, which allows payloads of at most 10^8 bytes, which
should be more than we will ever need.
"""
NUM_LENGTH_CHARS = 8

"""
Tries to start the consumer.

This method will try to open a TCP server socket at port *PORT*. If that succeeds, it will
call *consumer_setup()*. The result of *consumer_setup()* is the consumer.

Then, it will listen for incoming socket connections from the producers. For each incoming
connection, it will call *consume(consumer, payload)* where *payload* is the payload received
from the incoming connection. (Currently, it allows only 1 payload per producer connection.)

When no incoming connections have been made for *MAX_CONSUMER_WAIT_TIME* seconds, the server
socket will be closed and it will call *consumer_shutdown(consumer)*.
"""
def _run_consumer(consumer_setup, consume, consumer_shutdown):
from struct import pack

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:

# This sockopt will cause the server socket port to be freed instantly after the server
# socket is closed. If we would NOT do this, the entire producer-consumer system would
# hang when a payload is produced soon after a consumer has stopped.
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, pack('ii', 1, 0))
server_socket.bind((HOST, PORT))
server_socket.settimeout(MAX_CONSUMER_WAIT_TIME)
consumer = consumer_setup()
server_socket.listen()

try:
while True:
client_connection, _ = server_socket.accept()
with client_connection:
payload_length = int(str(client_connection.recv(NUM_LENGTH_CHARS), 'utf-8'))
payload = str(client_connection.recv(payload_length), 'utf-8')
consume(consumer, payload)
except socket.timeout:
consumer_shutdown(consumer)

"""
Calls _run_consumer and catches any OSError it may throw.
"""
def try_run_consumer(consumer_setup, consume, consumer_shutdown):
try:
_run_consumer(consumer_setup, consume, consumer_shutdown)
except OSError:
pass


"""
Performs a single attempt to produce the given *payload*. If the consumer can NOT be reached, it
will try to start a new consumer.
"""
def _try_produce(payload, consumer_path, consumer_parameters):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((HOST, PORT))
payload_length = str(len(payload))
if len(payload_length) > NUM_LENGTH_CHARS:
raise RuntimeError('Payload is too large')
while len(payload_length) < NUM_LENGTH_CHARS:
payload_length = '0' + payload_length
client_socket.sendall(bytes(payload_length, 'utf-8'))
client_socket.sendall(bytes(payload, 'utf-8'))
return True
except ConnectionRefusedError or TimeoutError:

# If the connection failed, the consumer is probably not running, so we should try to
# start it (in a different process).
consumer_process = os.popen('python3 ' + str(consumer_path.absolute()) + ' ' + consumer_parameters, mode="w")
consumer_process.detach()
return False


"""
Ensures that the given *payload* is 'consumed' in another process. The *consumer_path* should
point to a .py file that will call the *try_run_consumer* method of this class. If the consumer
is not yet running, this method will basically execute "python3 consumer_path consumer_parameters*
in a new process.
"""
def produce(payload, consumer_path, consumer_parameters):

# Since the _try_produce method is rather fragile, we may need to retry it a couple of times.
# It is fragile because:
#
# (1) The consumer may or may not be running. If it is not running, the _try_produce will
# obviously fail, but it will try to create a new consumer, so there is a big chance the
# next attempt will succeed.
#
# (2) When a new consumer is created, the OS scheduler might not start it immediately, so
# the second and third attempt may also fail.
#
# (3) When the existing consumer is quitting, the produce attempt will also fail, and so will
# its attempt to create a new consumer (since the port is still claimed by the quitting
# consumer). This will cause the second attempt to fail as well, and maybe also the third and
# fourth attempt.
_retry(lambda: _try_produce(payload, consumer_path, consumer_parameters), 0.01, 1.0, 10)
108 changes: 108 additions & 0 deletions binder/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import pathlib
import pika

from threading import Semaphore, Thread
from time import sleep


"""
The absolute path to this file. This is needed for the inter-process producers-consumer system.
"""
RABBITMQ_CONSUMER_PATH = pathlib.Path(__file__).absolute()


"""
This method will be executed on the RabbitMQ thread of the consumer. It will create a RabbitMQ
connection and publish all payloads it receives from the consumer. It will communicate with the
main consumer thread using semaphores.

The RabbitMQ connection is managed in a separate thread to keep the connection responsive while
the main thread is busy interacting with potentially slow producers.
"""
def _rabbitmq_thread_function(consumer):
from sys import argv

(produced_payload_semaphore, consumed_payload_semaphore, produced_payload) = consumer

rabbitmq_username = argv[1]
rabbitmq_password = argv[2]
rabbitmq_host = argv[3]

connection_credentials = pika.PlainCredentials(rabbitmq_username, rabbitmq_password)
connection_parameters = pika.ConnectionParameters(rabbitmq_host, credentials=connection_credentials)

connection = pika.BlockingConnection(parameters=connection_parameters)
channel = connection.channel()

# To communicate a stop message, the length of produced_payload will be set to 0
while len(produced_payload) == 1:

# To prevent the connection from being closed due to not responding, we don't wait
# on the semaphore indefinitely, but instead process data events every 0.01 seconds
# or after each publish.
has_task = produced_payload_semaphore.acquire(timeout=0.01)
if has_task:
channel.basic_publish('hightemplar', routing_key='*', body=produced_payload[0])
consumed_payload_semaphore.release()
connection.process_data_events(0)

connection.close()

"""
Creates the consumer and starts the RabbitMQ thread. It will use semaphores to manage
the synchronization and it will use an array of length 1 to communicate the payload
(the array can be shared between threads and the payload can be communicated by changing
its first and only element).
"""
def _consumer_setup():
produced_payload_semaphore = Semaphore(0)
consumed_payload_semaphore = Semaphore(0)
produced_payload = [None]

consumer = (produced_payload_semaphore, consumed_payload_semaphore, produced_payload)

rabbitmq_thread = Thread(target=lambda: _rabbitmq_thread_function(consumer))
# Make it a daemon thread to prevent it from outliving the main thread in case of
# unexpected errors. (But it should close the RabbitMQ connection properly in normal
# circumstances.)
rabbitmq_thread.setDaemon(True)
rabbitmq_thread.start()

return consumer

"""
Consumer a payload from a producer and propagates it to the RabbitMQ thread. It will block until
the payload has been published by the RabbitMQ thread.
"""
def _consume(consumer, payload):
(produced_payload_semaphore, consumed_payload_semaphore, produced_payload) = consumer

produced_payload[0] = payload
produced_payload_semaphore.release()
consumed_payload_semaphore.acquire()

"""
Sends the stop signal to the RabbitMQ thread
"""
def _consumer_shutdown(consumer):
(_, _, produced_payload) = consumer

# This will change the length of produced_payload to zero, which is the stop signal for
# the RabbitMQ thread
produced_payload.pop()

# Give the RabbitMQ thread some time to observe the signal and cleanly close the connection
sleep(0.1)

"""
Starts the RabbitMQ consumer
"""
def main():
from inter_process_producers_consumer import try_run_consumer

try_run_consumer(_consumer_setup, _consume, _consumer_shutdown)

# IMPORTANT: only call main() in the actual consumer process. NOT when the main process just
# tries to access *RABBITMQ_CONSUMER_PATH*
if __name__ == '__main__':
main()
25 changes: 11 additions & 14 deletions binder/websocket.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from django.conf import settings

from .json import jsondumps
from binder.inter_process_producers_consumer import produce
from binder.rabbitmq import RABBITMQ_CONSUMER_PATH
import requests
from requests.exceptions import RequestException

Expand Down Expand Up @@ -29,23 +31,18 @@ def list_rooms_for_user(self, user):

return rooms

# WARNING: When this is non-empty, the entire RabbitMQ connection will be mocked
mock_trigger_listeners = []

def trigger(data, rooms):
if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}):
import pika
from pika import BlockingConnection

connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'],
settings.HIGH_TEMPLAR['rabbitmq']['password'])
connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'],
credentials=connection_credentials)
connection = BlockingConnection(parameters=connection_parameters)
channel = connection.channel()

channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({
'data': data,
'rooms': rooms,
}))
global mock_trigger_listeners
if len(mock_trigger_listeners) > 0:
for trigger_listener in mock_trigger_listeners:
trigger_listener(jsondumps({ 'data': data, 'rooms': rooms }))
else:
rabbitmq_consumer_args = settings.HIGH_TEMPLAR['rabbitmq']['username'] + ' ' + settings.HIGH_TEMPLAR['rabbitmq']['password'] + ' ' + settings.HIGH_TEMPLAR['rabbitmq']['host']
produce(jsondumps({ 'data': data, 'rooms': rooms }), RABBITMQ_CONSUMER_PATH, rabbitmq_consumer_args)
if getattr(settings, 'HIGH_TEMPLAR_URL', None):
url = getattr(settings, 'HIGH_TEMPLAR_URL')
try:
Expand Down
2 changes: 2 additions & 0 deletions ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ codecov
coverage
django-hijack<3.0.0
openpyxl
fasteners
pika
4 changes: 3 additions & 1 deletion project/packages.pip
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ Pillow
django-request-id
psycopg2
requests
openpyxl
openpyxl
fasteners
pika
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
'Pillow >= 3.2.0',
'django-request-id >= 1.0.0',
'requests >= 2.13.0',
'fasteners >= 0.17.3',
'pika >= 1.1.0'
],
tests_require=[
'django-hijack >= 2.1.10, < 3.0.0',
Expand Down