Source code for stiqueue.sqclient

"""
This module implements a simple client for interacting with a message queue server.

Classes:
    SQClient: A client that connects to the message queue server to enqueue, dequeue, and check the count of messages.
"""

import socket
import sys
import time
import logging
import os


[docs]class SQClient: """ A client that connects to a message queue server for enqueuing, dequeuing, and retrieving the count of messages. Attributes: host (str): The server's host address. port (int): The port number to connect to the server. socket (socket.socket): The client socket to communicate with the server. buff_size (int): Buffer size for sending and receiving messages. logger (logging.Logger): Logger for printing messages. ack_required (bool): Indicates whether an acknowledgment is required after the client receives the message. """ def __init__(self, host="127.0.0.1", port=1234, logger=None, buff_size=None, ack_required=True): """ Initializes the SQClient with the specified parameters. Args: host (str): The server's host address. Defaults to "127.0.0.1". port (int): The port number to connect to the server. Defaults to 1234. logger (logging.Logger, optional): Logger for logging messages. If None, a default logger is created. buff_size (int, optional): Buffer size for sending and receiving messages. Defaults to None. ack_required (bool): Indicates whether an acknowledgment is required after the client receives the message. """ self.host = host self.port = port self.ack_required = ack_required if host is None: self.host = socket.gethostname() self.socket = None self.buff_size = buff_size if logger is None: logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) logger.addHandler(ch) self.logger = logger
[docs] def connect(self): """ Establishes a connection to the messaging queue server. """ self.logger.debug("Connecting to %s %d" % (self.host, self.port)) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.buff_size: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.buff_size) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.buff_size) else: self.buff_size = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) self.socket.connect((self.host, self.port))
[docs] def send_with_action(self, msg, action, recv=False): """ Sends a message with a specified action to the server. Args: msg (bytes or str): The message to send. If not in bytes, it will be encoded. action (bytes): The action command (e.g., "enq", "deq", "cnt"). recv (bool): Whether to expect a response from the server. Defaults to False. Returns: bytes: The server's response if recv is True, otherwise None. """ total_ret_val = None if not isinstance(msg, bytes): msg = msg.encode() req = action+msg self.logger.debug("send with action: ") self.logger.debug(req) self.connect() self.socket.sendall(req) if recv: while True: ret_val = self.socket.recv(self.buff_size) self.logger.debug(f"DEBUGGING recv: <{ret_val}>") if total_ret_val is None: total_ret_val = ret_val else: total_ret_val += ret_val if ret_val in [b'', '']: self.logger.debug(f"DEBUGGING: empty") break elif ret_val is None: self.logger.debug(f"DEBUGGING: time to break") break elif len(ret_val) < self.buff_size: self.logger.debug(f"DEBUGGING: ret val is smaller than buff size") break self.disconnect() return total_ret_val
[docs] def enq(self, msg): """ Sends an "enqueue" request to the server. Args: msg (bytes or str): The message to enqueue. If not in bytes, it will be encoded. """ self.send_with_action(msg, b"enq")
[docs] def deq(self): """ Sends a "dequeue" request to the server and receives the dequeued message. Returns: bytes: The dequeued message from the server. """ msg = self.send_with_action(b"", b"deq", recv=True) if self.ack_required: self.ack() return msg
[docs] def ack(self): """ Sends an acknowledgement request to the server. """ self.send_with_action(b"", b"ack")
[docs] def cnt(self): """ Sends a "count" request to the server and receives the count of messages in the queue. Returns: bytes: The count of messages in the queue. """ return self.send_with_action(b"", b"cnt", recv=True)
[docs] def disconnect(self): """ Closes the connection to the server. """ self.socket.close()
if __name__ == "__main__": print("CLIENT is STARTED> ...") host = "127.0.0.1" port = 1234 if len(sys.argv) >= 2: host = sys.argv[1] if len(sys.argv) >= 3: port = int(sys.argv[2]) local_logger = logging.getLogger(__name__) ch = logging.NullHandler() ch.setLevel(logging.INFO) local_logger.addHandler(ch) c = SQClient(host=host, port=port, logger=local_logger) for i in range(10): print(f"\n=========== {i} ==========") print(f"CLIENT> send num {i}") c.enq(b"num %d" % i) print("CLIENT> deq") v = c.deq() print(f"CLIENT: get num: {v}" )