Skip to content

Message Queues

Thread-safe message queues for buffering and processing messages.

Classes

InMemoryQueue

Thread-safe in-memory FIFO queue.

Methods: - put(item) - Add item to queue - get(block=True, timeout=None) - Get item from queue - task_done() - Mark task as complete - join() - Wait until all tasks are done - qsize() - Return queue size - empty() - Return True if queue is empty

Example:

from kn_sock.queue import InMemoryQueue
import threading

queue = InMemoryQueue()

# Producer thread
def producer():
    for i in range(5):
        queue.put(f"message-{i}")

# Consumer thread  
def consumer():
    while True:
        item = queue.get()
        print(f"Processing: {item}")
        queue.task_done()

threading.Thread(target=producer).start()
threading.Thread(target=consumer, daemon=True).start()
queue.join()  # Wait for all tasks to complete

FileQueue(path)

Persistent file-based queue that survives restarts.

Parameters: - path (str): File path for queue storage

Methods: - put(item) - Add item to queue (saves to disk) - get(block=True, timeout=None) - Get item from queue - task_done() - Mark task as complete - join() - Wait until all tasks are done - close() - Save queue state and close - qsize() - Return queue size - empty() - Return True if queue is empty

Example:

from kn_sock.queue import FileQueue

# Create persistent queue
queue = FileQueue("messages.db")

# Add messages
queue.put("message 1")
queue.put("message 2")

# Process messages
try:
    while not queue.empty():
        item = queue.get(timeout=1)
        print(f"Processing: {item}")
        queue.task_done()
except:
    pass

queue.close()  # Save state

Usage with Socket Servers

from kn_sock import start_tcp_server
from kn_sock.queue import InMemoryQueue
import threading

message_queue = InMemoryQueue()

def message_handler(data, addr, socket):
    # Add incoming messages to queue
    message_queue.put((data, addr, socket))

def message_processor():
    # Process messages from queue
    while True:
        data, addr, socket = message_queue.get()

        # Process the message
        response = f"Processed: {data.decode()}"
        socket.sendall(response.encode())

        message_queue.task_done()

# Start background processor
threading.Thread(target=message_processor, daemon=True).start()

# Start server
start_tcp_server(8080, message_handler)