Source code for social.sniffer

# -*- coding: utf-8 -*-
"""
============================================================
social.sniffer module containing Sniffer classes and methods
============================================================
   The *Sniffer* retrieves hashtaged posts from the desired Rocket.Chat channel.
      In between is a sqlite3 base with all messages and required metadata

   .. warning:: Coding style is CamelCase for classes and lowercase_separated_by_underscores A.K.A snake_case for
      methods and variables.
"""
# Basic python3 libraries
import logging
import re
import sqlite3
from os import path
from time import sleep

# Rocket.Chat API to get messages
from rocketchat_API.rocketchat import RocketChat

# Inherit from Bot class
from social.bot import Bot
# Import of the configuration object
from social.utils.configuration import CONFIG


[docs]class SnifferException(Exception): """Specific exception raised by the Sniffer class""" pass
[docs]class Sniffer(Bot): """ The Sniffer, listen to Rocket.Chat, search the *#public* in messages to load them in sqlite3 base """ def __init__(self, rc_user, rc_password): """ **Constructor of the Sniffer class** :param rc_user: User of the account that sniff on Rocket.Chat :param rc_password: Password of the user account that sniff on Rocket.Chat :raise SnifferException: If general channel not found """ Bot.__init__(self, rc_user, rc_password) self.rocket_chat = RocketChat(self.login, self.password, server_url=CONFIG["Sniffer"]["Rocket.Chat+"]["URL"]) rc_channel_list = self.rocket_chat.channels_list().json()['channels'] general_bool = False for channel in rc_channel_list: if CONFIG["Sniffer"]["Rocket.Chat+"]["channel"] == channel["_id"]: general_bool = True if general_bool is False: raise SnifferException("No Rocket.Chat general channel to spy on") self.room_id = CONFIG["Sniffer"]["Rocket.Chat+"]["channel"] self.log("Rocket.Chat+ Sniffer initialized with account {0} on computer {1}".format(self.login, self.computer)) def __repr__(self): """Representation of the Sniffer class instance""" return """Sniffer thread on {0} to sniff Rocket.Chat with account {1}""".format(self.computer, self.login) def __del__(self): """Log out from rocket chat account""" self.rocket_chat.logout()
[docs] def run(self): """Starting the internal methods of the Sniffer Thread""" # Initialize variable and the database before the loop _ids = [] except_bool = True database = sqlite3.connect(path.join(self.database_directory, self.database)) database.execute( '''CREATE TABLE IF NOT EXISTS public ( ID INTEGER PRIMARY KEY AUTOINCREMENT, _id TEXT NOT NULL, origin TEXT NOT NULL, message TEXT NOT NULL, facebook INT NOT NULL DEFAULT 0, linkedin INT NOT NULL DEFAULT 0, twitter INT NOT NULL DEFAULT 0, ts TIMESTAMP, CONSTRAINT constraint_id UNIQUE (_id) ) ;''') database.close() # Can be stopped with Ctrl-C (SIGINT) try: # The main loop while True: # Retrieve the last messages from the RC_CHANNEL_ID_TO_SNIFF channel message_data = \ self.rocket_chat.channels_history(CONFIG["Sniffer"]["Rocket.Chat+"]["channel"], count=1).json()['messages'][0] _id = message_data["_id"] if _id not in _ids and bool(re.match(r".*#public.*", message_data["msg"])) is True: # Regex substitution of the Emoticons no_emoticon_message = re.sub(r"\:.*\:", "", message_data["msg"]) # Final message composed of the original one without emoticon after the name of writer message = "{0}:\n {1}".format(message_data['u']["name"], no_emoticon_message) # Insert message without the #public in the sqlite3 DB_FOLDER/DB database self.insert_message(_id, self.login, message.replace("#public", ""), first_except=except_bool ) except_bool = False # Append list of already inserted message and remove duplicate _ids.append(_id) set(_ids) # Sniff smoothly on the server sleep(5) except KeyboardInterrupt: pass
[docs] def insert_message(self, _id, origin, message, first_except=False): """ Function inserting in the social database, table message, with or without sqlite3.IntegrityError for UNIQUE CONSTRAINT depending on first_except :param _id: Id of the message coming from Rocket.Chat :param origin: username of the robot account :param message: The message itself :param first_except: Kwarg, default is False, True for first iteration only """ with self.database_semaphore: database = sqlite3.connect(path.join(self.database_directory, self.database)) request = """INSERT INTO public ( _id, origin, message, facebook, linkedin, twitter, ts ) VALUES ( ?, ?, ?, 0, 0, 0, DATETIME('now', 'localtime') ) ;""" if first_except is True: try: database.execute(request, (_id, origin, message,)) except sqlite3.IntegrityError as err: # Log out the error message logging.error(str(err)) if first_except is False: database.execute(request, (_id, origin, message,)) database.commit() database.close() self.log("Sniffer for account {0} on computer {1} inserted a post {2} in message table".format(self.login, self.computer, _id))