Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 53 additions & 39 deletions TwitchChannelPointsMiner/TwitchChannelPointsMiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional

from TwitchChannelPointsMiner.classes.Chat import ChatPresence, ThreadChat
from TwitchChannelPointsMiner.classes.entities.PubsubTopic import PubsubTopic
Expand Down Expand Up @@ -80,7 +81,7 @@ def __init__(
disable_ssl_cert_verification: bool = False,
disable_at_in_nickname: bool = False,
# Settings for logging and selenium as you can see.
priority: list = [Priority.STREAK, Priority.DROPS, Priority.ORDER],
priority: Optional[list] = None,
# This settings will be global shared trought Settings class
logger_settings: LoggerSettings = LoggerSettings(),
# Default values for all streamers
Expand All @@ -97,24 +98,21 @@ def __init__(

Settings.disable_at_in_nickname = disable_at_in_nickname

import socket

def is_connected():
try:
# resolve the IP address of the Twitch.tv domain name
socket.gethostbyname("twitch.tv")
return True
except OSError:
pass
return False

# check for Twitch.tv connectivity every 5 seconds
# Wait for Twitch.tv connectivity with a timeout to avoid hanging forever
error_printed = False
while not is_connected():
connectivity_interval = 5
connectivity_timeout = 60
connectivity_start = time.time()
while not internet_connection_available(host="twitch.tv", port=443):
if not error_printed:
logger.error("Waiting for Twitch.tv connectivity...")
error_printed = True
time.sleep(5)
if (time.time() - connectivity_start) >= connectivity_timeout:
logger.error(
"Unable to reach Twitch.tv after 60 seconds, exiting..."
)
sys.exit(0)
time.sleep(connectivity_interval)

# Analytics switch
Settings.enable_analytics = enable_analytics
Expand All @@ -140,7 +138,13 @@ def is_connected():
self.twitch = Twitch(self.username, user_agent, password)

self.claim_drops_startup = claim_drops_startup
self.priority = priority if isinstance(priority, list) else [priority]
if priority is None:
priority = [Priority.STREAK, Priority.DROPS, Priority.ORDER]
elif not isinstance(priority, list):
priority = [priority]
else:
priority = list(priority)
self.priority = priority

self.streamers: list[Streamer] = []
self.events_predictions = {}
Expand Down Expand Up @@ -203,29 +207,37 @@ def analytics(

def mine(
self,
streamers: list = [],
blacklist: list = [],
streamers: Optional[list] = None,
blacklist: Optional[list] = None,
followers: bool = False,
followers_order: FollowersOrder = FollowersOrder.ASC,
):
self.run(streamers=streamers, blacklist=blacklist, followers=followers)
self.run(
streamers=streamers,
blacklist=blacklist,
followers=followers,
followers_order=followers_order,
)

def run(
self,
streamers: list = [],
blacklist: list = [],
streamers: Optional[list] = None,
blacklist: Optional[list] = None,
followers: bool = False,
followers_order: FollowersOrder = FollowersOrder.ASC,
):
if self.running:
logger.error("You can't start multiple sessions of this instance!")
else:
logger.info(
f"Start session: '{self.session_id}'", extra={"emoji": ":bomb:"}
)
self.running = True
self.start_datetime = datetime.now()
return

streamers = [] if streamers is None else streamers
blacklist = [] if blacklist is None else blacklist

logger.info(f"Start session: '{self.session_id}'", extra={"emoji": ":bomb:"})
self.running = True
self.start_datetime = datetime.now()

try:
self.twitch.login()

if self.claim_drops_startup is True:
Expand Down Expand Up @@ -292,17 +304,17 @@ def run(
# 1. Load channel points and auto-claim bonus
# 2. Check if streamers are online
# 3. DEACTIVATED: Check if the user is a moderator. (was used before the 5th of April 2021 to deactivate predictions)
for streamer in self.streamers:
time.sleep(random.uniform(0.3, 0.7))
try:
self.twitch.load_channel_points_context(streamer)
self.twitch.check_streamer_online(streamer)
# self.twitch.viewer_is_mod(streamer)
except StreamerDoesNotExistException:
logger.info(
f"Streamer {streamer.username} does not exist",
extra={"emoji": ":cry:"},
)
invalid_streamers = self.twitch.initialize_streamers_context(self.streamers)
if invalid_streamers:
self.streamers = [
streamer
for streamer in self.streamers
if streamer.username not in invalid_streamers
]
if not self.streamers:
logger.error("No valid streamers available after initialization.")
self.end(0, 0)
return

self.original_streamers = [
streamer.channel_points for streamer in self.streamers
Expand Down Expand Up @@ -411,6 +423,8 @@ def run(
self.twitch.load_channel_points_context(
self.streamers[index]
)
finally:
self.running = False

def end(self, signum, frame):
if not self.running:
Expand Down Expand Up @@ -510,4 +524,4 @@ def __print_report(self):
logger.info(
f"{streamer_gain}\n{streamer_history}",
extra={"emoji": ":moneybag:"},
)
)
43 changes: 43 additions & 0 deletions TwitchChannelPointsMiner/classes/Twitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import re
import string
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests
import validators
# import json
Expand Down Expand Up @@ -696,6 +698,40 @@ def load_channel_points_context(self, streamer):
if streamer.settings.community_goals is True:
self.contribute_to_community_goals(streamer)

Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate code: The check if streamer.settings.community_goals is True and call to self.contribute_to_community_goals(streamer) appears three times (lines 698-699, 701-702). This appears to be unintentional duplication that should be removed, keeping only one instance.

Suggested change
if streamer.settings.community_goals is True:
self.contribute_to_community_goals(streamer)

Copilot uses AI. Check for mistakes.
def initialize_streamers_context(self, streamers, max_workers=5):
if not streamers:
return set()

failed_streamers = set()

def _load_streamer_context(streamer):
time.sleep(random.uniform(0.3, 0.7))
self.load_channel_points_context(streamer)
self.check_streamer_online(streamer)

workers = max(1, min(max_workers, len(streamers)))
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(_load_streamer_context, streamer): streamer
for streamer in streamers
}
for future in as_completed(futures):
streamer = futures[future]
try:
future.result()
except StreamerDoesNotExistException:
failed_streamers.add(streamer.username)
logger.info(
f"Streamer {streamer.username} does not exist",
extra={"emoji": ":cry:"},
)
except Exception:
logger.error(
f"Failed to initialize streamer {streamer.username}",
exc_info=True,
)
return failed_streamers

def make_predictions(self, event):
decision = event.bet.calculate(event.streamer.channel_points)
# selector_index = 0 if decision["choice"] == "A" else 1
Expand Down Expand Up @@ -934,11 +970,18 @@ def claim_all_drops_from_inventory(self):
drop.is_claimed = self.claim_drop(drop)
time.sleep(random.uniform(5, 10))

def __streamers_require_campaign_sync(self, streamers):
return any(streamer.drops_condition() for streamer in streamers)

def sync_campaigns(self, streamers, chunk_size=3):
campaigns_update = 0
campaigns = []
while self.running:
try:
if not self.__streamers_require_campaign_sync(streamers):
campaigns = []
self.__chuncked_sleep(60, chunk_size=chunk_size)
continue
# Get update from dashboard each 60minutes
if (
campaigns_update == 0
Expand Down