from datetime import datetime import json from browser_manager import BrowserManager from selenium.common.exceptions import TimeoutException, WebDriverException from selenium.webdriver.chrome.options import Options from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from utils.config_loader import ConfigLoader from wwproxy.wwproxy import get_new_proxy, get_current_proxy, sanitize_for_log from utils.logger import logger import os import sys import time from random import uniform, randint, choice from selenium.webdriver import Chrome import requests import hashlib import threading import warnings warnings.filterwarnings("ignore") import urllib3 urllib3.disable_warnings() # from utils.retry_manager import RetryManager # Tắt tất cả các warning logs không cần thiết os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" os.environ["PYTHONIOENCODING"] = "utf-8" os.environ["CHROME_LOG_FILE"] = os.devnull os.environ["GLOG_minloglevel"] = "3" os.environ["GLOG_logtostderr"] = "0" os.environ["GLOG_log_dir"] = os.devnull os.environ["CHROME_HEADLESS"] = "1" os.environ["CHROME_NO_SANDBOX"] = "1" # Tắt Chrome GPU và WebGL warnings os.environ["CHROME_DISABLE_GPU"] = "1" os.environ["CHROME_DISABLE_SOFTWARE_RASTERIZER"] = "1" os.environ["CHROME_DISABLE_WEB_SECURITY"] = "1" os.environ["CHROME_DISABLE_FEATURES"] = "VizDisplayCompositor" # Tắt Google API warnings os.environ["GOOGLE_API_USE_MTLS_ENDPOINT"] = "never" os.environ["GOOGLE_CLOUD_DISABLE_GRPC_LOGS"] = "true" if sys.platform == "win32": import subprocess subprocess.DEVNULL = open(os.devnull, "w") # Redirect stderr to devnull for Chrome import warnings warnings.filterwarnings("ignore") class AutoLogin(BrowserManager): def __init__(self, website_url, api_key=None, proxy=None, connection_pool=None, headers_manager=None, memory_manager=None, user_api_key=None, use_proxy=True, use_chromium=False, thread_id=1): super().__init__() self.website_url = website_url self.api_key = api_key self.proxy = proxy self.connection_token = None self.current_username = None self.token = "" self.api_results = {} self.use_proxy = use_proxy self.proxy_change_interval = 20 self.last_proxy_change = 0 self.current_proxy_ip = None self.thread_id = thread_id self.fingerprint = self.generate_fingerprint() self.check_code_encrypt = "/QUDEBgsYFVQNf3OEK4wkw==" # Initialize retry manager # self.retry_manager = RetryManager() # Optimized components self.connection_pool = connection_pool self.headers_manager = headers_manager self.memory_manager = memory_manager # Session key for this login instance self.session_key = f"login_{int(time.time())}_{randint(1000, 9999)}" # WWProxy keys self.proxy_keys = self.load_proxy_keys() if use_proxy else [] # Load anti-detection config self.anti_detection_config = self.load_anti_detection_config() self.current_headers = self.generate_dynamic_headers() self.use_chromium = use_chromium def load_proxy_keys(self): """Load proxy keys từ file""" try: with open("config/keys.txt", "r") as f: return [line.strip() for line in f if line.strip()] except FileNotFoundError: logger.error("Lỗi file: Không tìm thấy keys.txt") return [] except PermissionError: logger.error("Lỗi file: Không có quyền đọc keys.txt") return [] except UnicodeDecodeError as e: logger.error(f"Lỗi file: Lỗi encoding keys.txt - {sanitize_for_log(str(e))}") return [] except Exception as e: logger.error(f"Lỗi file: keys.txt - {sanitize_for_log(str(e))}") return [] def load_anti_detection_config(self): """Load anti-detection configuration""" try: with open("config/anti_detection_config.json", "r", encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: logger.warning(f"Anti-detection config not found, using defaults: {sanitize_for_log(str(e))}") return self.get_default_anti_detection_config() except Exception as e: logger.error(f"Error loading anti-detection config: {sanitize_for_log(str(e))}") return self.get_default_anti_detection_config() def generate_fingerprint(self): """Tạo fingerprint duy nhất cho mỗi thread/account""" unique_string = f"{self.thread_id}_{int(time.time())}_{randint(10000, 99999)}" return hashlib.md5(unique_string.encode()).hexdigest() def get_default_anti_detection_config(self): """Default anti-detection configuration""" return { "user_agents": { "chrome_windows": [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 OPR/121.0.0.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 OPR/122.0.0.0" ], "random_selection": True }, "headers": { "accept_languages": ["vi-VN,vi;q=0.9,en-US;q=0.8,en;q=0.7", "en-US,en;q=0.9,vi;q=0.8"], "accept_encodings": ["gzip, deflate, br", "gzip, deflate, br, zstd"], "sec_ch_ua_platforms": ['"Windows"', '"macOS"', '"Linux"'] }, "timing": { "page_load_delay": {"min": 2.0, "max": 5.0}, "click_delay": {"min": 0.3, "max": 0.8} }, "user_agent": { "custom": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138 Safari/537.36" } } def generate_dynamic_headers(self): """Generate dynamic headers for anti-detection""" config = self.anti_detection_config # Random user agent selection if enabled if config["user_agents"].get("random_selection", True): user_agent = choice(config["user_agents"]["chrome_windows"]) else: user_agent = config["user_agent"].get("custom", config["user_agents"]["chrome_windows"][0]) accept_language = choice(config["headers"]["accept_languages"]) accept_encoding = choice(config["headers"]["accept_encodings"]) sec_ch_ua_platform = choice(config["headers"]["sec_ch_ua_platforms"]) # Extract Chrome version from user agent for Sec-Ch-Ua chrome_version = "138" if "Chrome/" in user_agent: try: chrome_version = user_agent.split("Chrome/")[1].split(".")[0] except: chrome_version = "138" # Default to latest if parsing fails return { "User-Agent": user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", "Accept-Language": accept_language, "Accept-Encoding": accept_encoding, "Cache-Control": "no-cache", "Pragma": "no-cache", "Sec-Ch-Ua": f'"Not)A;Brand";v="8", "Chromium";v="{chrome_version}"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": sec_ch_ua_platform, "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1" } def should_change_proxy(self): """Kiểm tra có nên đổi proxy không""" if not self.use_proxy: return False current_time = time.time() return (current_time - self.last_proxy_change) >= self.proxy_change_interval def get_fresh_proxy(self, force_new=False): """Lấy proxy mới từ WWProxy với tùy chọn đổi IP theo thời gian""" if not self.use_proxy: logger.info("🚫 Proxy disabled - using direct connection") return None if not self.proxy_keys: logger.warning("⚠️ Không có proxy keys") return None try: proxy_key = choice(self.proxy_keys) current_time = time.time() # Kiểm tra có cần đổi proxy không if not force_new and self.current_proxy_ip and not self.should_change_proxy(): remaining_time = self.proxy_change_interval - (current_time - self.last_proxy_change) logger.info(f"🕐 Dùng proxy hiện tại, đổi sau {remaining_time:.0f}s: {self.current_proxy_ip}") return self.current_proxy_ip # Lấy proxy mới new_proxy = get_new_proxy(proxy_key) if new_proxy and new_proxy.get('ipAddress'): ip = sanitize_for_log(str(new_proxy['ipAddress'])) port = sanitize_for_log(str(new_proxy['port'])) proxy_str = f"{ip}:{port}" # Cập nhật thông tin proxy self.current_proxy_ip = proxy_str self.last_proxy_change = current_time logger.info(f"🔄 Proxy mới (đổi sau 20s): {proxy_str}") return proxy_str logger.warning(f"⚠️ Không lấy được proxy cho {sanitize_for_log(str(self.current_username))}") return None except (KeyError, TypeError, ValueError) as e: logger.error(f"❌ Proxy data error for {sanitize_for_log(str(self.current_username))}: {sanitize_for_log(str(e))}") return None except (ConnectionError, TimeoutError) as e: logger.error(f"❌ Network error getting proxy: {sanitize_for_log(str(e))}") return None except Exception as e: logger.warning(f"⚠️ Không lấy được proxy cho {sanitize_for_log(str(self.current_username))}: {sanitize_for_log(str(e))}") return None def try_find_element(self, locators, timeout=10, condition=EC.presence_of_element_located): """Helper to try multiple locators for finding an element""" for by, selector in locators: try: return WebDriverWait(self.driver, timeout).until( condition((by, selector)) ) except TimeoutException: logger.debug(f"Locator not found: {by} - {selector}") continue raise TimeoutException("Element not found with any locator") def try_click_element(self, locators, timeout=10): """Helper to try clicking with multiple locators""" elem = self.try_find_element(locators, timeout, EC.element_to_be_clickable) try: elem.click() except WebDriverException: # Nếu click bị chặn, dùng JavaScript click self.driver.execute_script("arguments[0].click();", elem) return True def save_balance_to_txt(self, username, balance, password=""): """Lưu số dư ra file txt""" try: os.makedirs("results", exist_ok=True) with open("results/balance_acc.txt", "a", encoding="utf-8") as f: f.write(f"{username}|{password}|số dư: {balance}\n") logger.info(f"Xuất file thành công: results/balance_acc.txt") except (OSError, IOError) as e: logger.error(f"Lỗi file: Không thể ghi balance_acc.txt - {sanitize_for_log(str(e))}") except Exception as e: logger.error(f"Lỗi file: {sanitize_for_log(str(e))}") def save_api_results_to_json(self, username, results): """Lưu kết quả API ra file json""" try: os.makedirs("results", exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"results/api_results_{username}_{timestamp}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) logger.info("✅ Saved API results to JSON") except (OSError, IOError, json.JSONEncodeError) as e: logger.error(f"❌ Error saving API results: {sanitize_for_log(str(e))}") except Exception as e: logger.error(f"❌ Unexpected error saving API results: {sanitize_for_log(str(e))}") def step1_access_website(self): """Truy cập website với tùy chọn proxy""" try: # Lấy proxy nếu được bật if self.use_proxy: fresh_proxy = self.get_fresh_proxy(force_new=True) if fresh_proxy: self.proxy = fresh_proxy logger.info(f"🌐 Sử dụng WWProxy: {fresh_proxy}") else: logger.warning(f"⚠️ Không lấy được proxy cho {sanitize_for_log(str(self.current_username))}, dùng IP thật") self.proxy = None else: self.proxy = None logger.info("🌐 Sử dụng kết nối trực tiếp (không proxy)") # Tạo browser với anti-detection headers if not self.create_browser_with_headers(self.proxy): logger.error("❌ Không thể khởi tạo trình duyệt") return False logger.info(f"🌐 Truy cập website: {self.website_url}") self.driver.get(self.website_url) time.sleep(uniform(2.0, 4.0)) self.wait.until(EC.presence_of_element_located((By.TAG_NAME, "body"))) # Apply anti-detection scripts self.apply_anti_detection_scripts() logger.info("✅ Truy cập website thành công") return True except (TimeoutException, WebDriverException) as e: logger.error(f"❌ Browser error accessing website: {sanitize_for_log(str(e))}") return False except Exception as e: logger.error(f"❌ Unexpected error accessing website: {sanitize_for_log(str(e))}") return False def create_browser_with_headers(self, proxy=None): """Tạo trình duyệt với headers chống phát hiện""" try: # Sử dụng headers_manager nếu có, nếu không thì dùng headers đã tạo if self.headers_manager and hasattr(self.headers_manager, 'get_browser_headers'): headers = self.headers_manager.get_browser_headers() else: headers = self.current_headers # Tạo trình duyệt với headers tùy chỉnh success = self.create_browser(proxy) if success and self.driver: try: # Thiết lập headers tùy chỉnh qua CDP self.driver.execute_cdp_cmd('Network.setUserAgentOverride', { "userAgent": headers.get("User-Agent"), "acceptLanguage": headers.get("Accept-Language"), "platform": "Win32" }) # Bật network domain để sửa đổi headers self.driver.execute_cdp_cmd('Network.enable', {}) logger.info("✅ Tạo trình duyệt với headers chống phát hiện thành công") except Exception as cdp_error: logger.warning(f"⚠️ Không thể thiết lập CDP headers: {sanitize_for_log(str(cdp_error))}") return True return False except Exception as e: logger.error(f"❌ Lỗi tạo trình duyệt với headers: {sanitize_for_log(str(e))}") return self.create_browser(proxy) # Fallback về trình duyệt thông thường def apply_anti_detection_scripts(self): """Apply anti-detection JavaScript scripts""" try: if not self.driver: return # Anti-detection scripts scripts = [ # Hide webdriver property "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})", # Mock plugins "Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})", # Mock languages "Object.defineProperty(navigator, 'languages', {get: () => ['vi-VN', 'vi', 'en-US', 'en']})", # Mock permissions "const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => (parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters));", # Mock chrome runtime "window.chrome = { runtime: {} };", # Hide automation indicators "delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array; delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise; delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol;", # Mock screen properties "Object.defineProperty(screen, 'availWidth', {get: () => 1920}); Object.defineProperty(screen, 'availHeight', {get: () => 1040});", # Mock timezone "Intl.DateTimeFormat = function() { return { resolvedOptions: () => ({ timeZone: 'Asia/Ho_Chi_Minh' }) } };" ] for script in scripts: try: self.driver.execute_script(script) except Exception as script_error: logger.debug(f"Script execution warning: {sanitize_for_log(str(script_error))}") logger.info("✅ Anti-detection scripts applied") except Exception as e: logger.warning(f"⚠️ Error applying anti-detection scripts: {sanitize_for_log(str(e))}") def step2_close_notifications(self): """Tắt thông báo với nhiều selector và XPath""" try: # Thử nhấn ESC trước self.driver.find_element(By.TAG_NAME, "body").send_keys(Keys.ESCAPE) time.sleep(uniform(0.3, 0.7)) close_locators = [ (By.CSS_SELECTOR, "span[ng-click='$ctrl.ok()'][translate='Common_Closed']"), (By.CSS_SELECTOR, "span.ng-scope:contains('Đóng')"), (By.CSS_SELECTOR, "._9PbX_LFgnXvcTnC_3cq6B span"), (By.CSS_SELECTOR, "#rootBody > div.modal button"), (By.CSS_SELECTOR, "gupw-dialog-announcement button"), (By.XPATH, "//*[contains(text(), 'Đóng')]"), (By.XPATH, "//button[contains(@class, 'close') or contains(@aria-label, 'Close')]"), (By.CSS_SELECTOR, "button[aria-label='Close'], div.modal button.close"), ] try: self.try_click_element(close_locators, timeout=2) time.sleep(uniform(0.3, 0.7)) logger.info("✅ Đã tắt thông báo") except TimeoutException: logger.info("ℹ️ Không có thông báo hoặc đã tắt") return True except (TimeoutException, WebDriverException) as e: logger.info(f"ℹ️ Browser error closing notifications: {sanitize_for_log(str(e))}") return True except Exception as e: logger.info(f"ℹ️ Unexpected error closing notifications: {sanitize_for_log(str(e))}") return True def step3_click_login_button(self): """Nhấn nút ĐĂNG NHẬP với nhiều selector và XPath""" try: # Các locators cho nút đăng nhập login_locators = [ (By.CSS_SELECTOR, "button[ng-click='$ctrl.openLoginModal()'][translate='Shared_Login']"), (By.CSS_SELECTOR, "button.ng-scope._2mBNgBjbvImj-b_6WuwAFm"), (By.CSS_SELECTOR, "button._2mBNgBjbvImj-b_6WuwAFm"), (By.CSS_SELECTOR, "#app button.ng-scope._2mBNgBjbvImj-b_6WuwAFm"), (By.XPATH, "//button[contains(text(), 'Đăng nhập')]"), (By.XPATH, "//*[contains(@ng-click, 'openLoginModal')]"), (By.XPATH, "//button[translate='Shared_Login']"), (By.CSS_SELECTOR, "button[translate='Shared_Login']"), (By.CSS_SELECTOR, "a[ng-click*='login']"), (By.XPATH, "//a[contains(text(), 'Đăng nhập')]"), ] try: # Attempt to click the login button using multiple locators clicked = self.try_click_element(login_locators, timeout=5) if clicked: time.sleep(5) logger.info("✅ Đã nhấn nút đăng nhập") return True else: logger.error("❌ Không tìm thấy nút đăng nhập") return False except TimeoutException as e: logger.error(f"❌ Không tìm thấy nút đăng nhập: {sanitize_for_log(str(e))}") return False except WebDriverException as e: logger.error(f"❌ Browser error clicking login button: {sanitize_for_log(str(e))}") return False except Exception as e: logger.error(f"❌ Unexpected error clicking login button: {sanitize_for_log(str(e))}") return False except Exception as e: logger.error(f"❌ Lỗi nhấn nút đăng nhập: {sanitize_for_log(str(e))}") return False def step4_fill_login_form(self, username, password): """Điền thông tin vào form đăng nhập với multiple locators""" try: # Locators cho username input username_locators = [ (By.CSS_SELECTOR, "input[ng-model='$ctrl.user.account.value']"), (By.XPATH, "//input[contains(@placeholder, 'Tài khoản')]"), (By.CSS_SELECTOR, "input[type='text'][placeholder*='Tài khoản']"), (By.NAME, "username"), # If applicable ] # Locators cho password input password_locators = [ (By.CSS_SELECTOR, "input[ng-model='$ctrl.user.password.value']"), (By.XPATH, "//input[contains(@placeholder, 'Mật khẩu')]"), (By.CSS_SELECTOR, "input[type='password']"), (By.NAME, "password"), ] try: username_input = self.try_find_element(username_locators) username_input.clear() username_input.send_keys(username) except (TimeoutException, WebDriverException) as e: logger.error(f"❌ Browser error with username field: {sanitize_for_log(str(e))}") return False except Exception as e: logger.error(f"❌ Unexpected error with username field: {sanitize_for_log(str(e))}") return False password_input = self.try_find_element(password_locators) password_input.clear() password_input.send_keys(password) logger.info("✅ Đã điền thông tin đăng nhập") return True except (TimeoutException, WebDriverException) as e: logger.error(f"❌ Browser error filling form: {sanitize_for_log(str(e))}") return False except Exception as e: logger.error(f"❌ Unexpected error filling form: {sanitize_for_log(str(e))}") return False def step5_solve_captcha_auto(self): """Tự động giải captcha bằng API""" try: from utils.config_loader import ConfigLoader config = ConfigLoader(os.getcwd()) api_key = config.get_api_key() if not api_key or len(api_key.strip()) == 0: logger.info("Không có API key, bỏ qua giải captcha") return True # Tìm ô nhập captcha và click để hiện captcha captcha_input = WebDriverWait(self.driver, 1.5).until( EC.element_to_be_clickable((By.CSS_SELECTOR, "input[ng-model='$ctrl.code']")) ) captcha_input.click() time.sleep(0.5) # Tìm captcha image với các selector captcha_selectors = [ "img[ng-class*='captcha']", "img[ng-class='$ctrl.styles.captcha']", "img[ng-src*='captcha']", "img[src*='captcha']", "img[src^='data:image/png;base64']" ] captcha_image = None for selector in captcha_selectors: try: captcha_image = self.driver.find_element(By.CSS_SELECTOR, selector) if captcha_image.is_displayed(): break except: continue if not captcha_image: logger.error("Lỗi xử lý captcha: Không tìm thấy captcha image") return False # Lấy base64 của captcha captcha_src = captcha_image.get_attribute("src") if "base64," in captcha_src: base64_image = captcha_src.split(",")[1] else: # Chuyển đổi image thành base64 base64_image = self.driver.execute_script(""" var canvas = document.createElement('canvas'); var ctx = canvas.getContext('2d'); var img = arguments[0]; canvas.width = img.width; canvas.height = img.height; ctx.drawImage(img, 0, 0); return canvas.toDataURL('image/png').split(',')[1]; """, captcha_image) # Gọi API để giải captcha result = self.call_captcha_api(base64_image, api_key) if result: # Nhập kết quả vào ô captcha captcha_input.clear() captcha_input.send_keys(result) logger.success(f"GIẢI CAPTCHA THÀNH CÔNG - {result}") return True else: logger.error("Lỗi xử lý captcha: Không giải được captcha") return False except TimeoutException: logger.error(f"Lỗi xử lý captcha: Timeout khi tìm captcha - {self.driver.current_url}") return False except Exception as e: logger.error(f"Lỗi xử lý captcha: {type(e).__name__}: {sanitize_for_log(str(e))}") return False def call_captcha_api(self, base64_image, api_key): """Gọi API giải captcha""" if not api_key or not base64_image: logger.error("Lỗi xử lý captcha: API key hoặc captcha rỗng") return None try: payload = { "key": api_key, "type": "imagetotext", "img": f"data:image/png;base64,{base64_image}", "numeric": 2, "min_len": 4, "max_len": 4 } import requests response = requests.post( "https://autocaptcha.pro/apiv3/process", json=payload, headers={'Content-Type': 'application/json'}, timeout=10 ) if response.status_code == 200: try: data = response.json() if data.get('success'): result = data.get('result', '') or data.get('captcha', '') result = result.strip() if result and len(result) >= 3: return result.upper() else: logger.error(f"Lỗi xử lý captcha: API trả về lỗi - {data.get('message', 'Unknown')}") except json.JSONDecodeError: clean_text = ''.join(c for c in response.text if c.isprintable())[:50] logger.error(f"Lỗi xử lý captcha: Không thể parse JSON - {sanitize_for_log(clean_text)}...") else: logger.error(f"Lỗi xử lý captcha: HTTP {response.status_code} - {response.reason}") except requests.Timeout: logger.error("Lỗi xử lý captcha: Timeout khi gọi API") except Exception as e: logger.error(f"Lỗi xử lý captcha: {sanitize_for_log(str(e))}") return None def step6_submit_login(self): """Nhấn nút ĐĂNG NHẬP để submit với multiple locators""" try: # Locators cho submit button submit_locators = [ (By.CSS_SELECTOR, "button[type='submit']._1elJEDoklSJeZCRhRorPTp"), (By.CSS_SELECTOR, "button[type='submit']"), (By.XPATH, "//button[contains(text(), 'Đăng nhập')]"), (By.CSS_SELECTOR, "button[ng-click='$ctrl.login()']"), (By.XPATH, "//form//button[@type='submit']"), ] self.try_click_element(submit_locators) time.sleep(uniform(0.1, 0.3)) logger.info("✅ Đã submit đăng nhập") return True except (TimeoutException, WebDriverException) as e: logger.error(f"❌ Browser error submitting login: {sanitize_for_log(str(e))}") return False except Exception as e: logger.error(f"❌ Unexpected error submitting login: {sanitize_for_log(str(e))}") return False def check_login_error(self): """Check login errors and success""" try: # Chờ 5s cho trang load hoàn toàn time.sleep(5) # Kiểm tra token đăng nhập trước token = self._get_value_by_name(self.driver.get_cookies(), "_pat") if token: logger.info(f"✅ Đã có token đăng nhập: {token[:20]}...") return None # Đăng nhập thành công # Kiểm tra URL thay đổi current_url = self.driver.current_url if any(keyword in current_url for keyword in ["dashboard", "home", "main", "user", "profile"]): logger.info(f"✅ URL đã thay đổi: {current_url}") return None # Kiểm tra lỗi cụ thể error_selectors = [ "div[bind-html-compile='$ctrl.content']", ".error", ".alert-danger", ".notification-error", ".toast-error", ".message-error", "[ng-show*='error']" ] for selector in error_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) for el in elements: if el.is_displayed(): text = el.text.strip() if "Lỗi mã xác minh" in text or "captcha" in text.lower(): return "captcha_error" elif "Tài khoản" in text and "sai" in text: return "credential_error" elif "Lỗi đăng nhập" in text: return "login_error" elif text and len(text) > 5: return text except: continue # Kiểm tra modal đăng nhập vẫn còn modal_selectors = ["div.modal[role='dialog']", ".login-modal", "input[ng-model*='account']"] for selector in modal_selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) if elements and any(el.is_displayed() for el in elements): return "login_modal_still_open" except: continue return None # Không có lỗi except Exception as e: logger.error(f"Error checking login status: {sanitize_for_log(str(e))}") return None def get_signalr_token(self): """Fetch the SignalR connection token by negotiating with the server.""" try: if self.connection_token: return self.connection_token # Get cookies from browser cookies = { cookie["name"]: cookie["value"] for cookie in self.driver.get_cookies() } # Assume base URL from current URL base_url = self.driver.current_url.rsplit("/", 1)[0] # Typical SignalR negotiate endpoint; adjust 'hub' to actual hub name if known (e.g., 'chatHub') negotiate_url = f"{base_url}/signalr/negotiate?clientProtocol=2.1" # Using JSON protocol headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", "Accept": "*/*", } # Make the negotiate request response = requests.post(negotiate_url, headers=headers, cookies=cookies) if response.status_code == 200: data = response.json() self.connection_token = data.get("ConnectionToken") if self.connection_token: logger.info("✅ Successfully fetched SignalR connection token") # print(f"====> self.connection_token: {self.connection_token}") # print(f"====> response.cookies.get_dict(): {response.cookies.get_dict()}") return self.connection_token else: logger.error("❌ Connection token not found in negotiate response") else: logger.error( f"❌ Negotiate request failed with status: {response.status_code}" ) return None except Exception as e: logger.error(f"❌ Error fetching SignalR token: {e}") return None def _get_value_by_name(self, data, target_name): for item in data: if item.get("name") == target_name: return item.get("value") return None def make_api_request(self, endpoint, method="GET", data=None): """Gọi API tối ưu""" if not self.connection_token: self.get_signalr_token() if not self.connection_token: return None try: cookies = {c["name"]: c["value"] for c in self.driver.get_cookies()} full_url = f"{self.driver.current_url.rsplit('/', 1)[0]}{endpoint}" headers = { "authorization": f"Bearer {self.token}", "referer": self.website_url, "x-requested-with": "XMLHttpRequest", "x-sec-data": "jdTtyMJAGl/2RMv8ViwRZd7g1SaqOyh1g6Yy79XjMVPBw5egt3shkyt0sD/WYr3vQ1ro7BBCL5TWsAA0vtqRBe8NdgX44tuXl6lqc7WCHG6WX382fPDyyLPDlyQWS79UGrFh58MAWKp3L9uBbVjSxidhgVbG5mSxIOM0lzaTTGU=", "x-fingerprint": self.fingerprint } import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if method == "POST": response = requests.post(full_url, cookies=cookies, headers=headers, json=data or {}, timeout=5, verify=False) else: response = requests.get(full_url, cookies=cookies, headers=headers, timeout=5, verify=False) if response.status_code == 200: result = response.json() if result.get("Code") == 200: return result logger.error(f"Lỗi API {sanitize_for_log(endpoint)}: Code {result.get('Code')}") elif response.status_code == 399: logger.warning(f"API {sanitize_for_log(endpoint)}: Cần đăng nhập lại (HTTP 399)") return None else: logger.error(f"Lỗi API {sanitize_for_log(endpoint)}: HTTP {response.status_code}") return None except Exception as e: logger.error(f"Lỗi API {sanitize_for_log(endpoint)}: {sanitize_for_log(str(e))}") return None def step8_get_user_info(self): """Goi GET lấy thông tin user""" try: # Thử lấy token trước khi gọi API if not self.token: self.token = self._get_value_by_name(self.driver.get_cookies(), "_pat") if not self.token: logger.error("Không có token đăng nhập") return None data = self.make_api_request("/api/1.0/user/info") if data: self.api_results["user_info"] = data result = data.get("Result", {}) username = result.get("Account") balance = result.get("Balance") name = result.get("Name") ip = result.get("IP", "Unknown") logger.success(f"GET USER INFO thành công - Account: {sanitize_for_log(str(username))} | Balance: {sanitize_for_log(str(balance))} | Name: {sanitize_for_log(str(name))} | IP: {sanitize_for_log(str(ip))}") # Save balance and API results self.save_balance_to_txt(username, balance, "") self.save_api_results_to_json(username, self.api_results) return result else: logger.error("Lỗi GET USER INFO: Không nhận được dữ liệu") except Exception as e: logger.error(f"Lỗi GET USER INFO: {sanitize_for_log(str(e))}") return None def step9_get_red_envelope_list(self): """GỌI POST kiẻm tra hồng bao""" if not self.connection_token: self.get_signalr_token() if not self.connection_token: logger.error("Không thể lấy connection token") return 0 try: data = self.make_api_request("/api/0.0/RedEnvelope/GetRedEnvelopList") if data and data.get("Code") == 200: envelopes = data.get("Result", []) if envelopes: for envelope in envelopes: amount = envelope.get("Amount", 0) if amount > 0: logger.success(f"Đã nhận lì xì [{amount:.2f}]") return amount logger.info("Không có hồng bao") else: logger.info("Không có hồng bao") except Exception as e: logger.error(f"Lỗi GetRedEnvelopList: {sanitize_for_log(str(e))}") return 0 def step10_get_user_balance(self): """Gọi GET lấy số dư sau khi nhận hồng bao """ try: data = self.make_api_request("/api/1.0/member/balance") if data: balance = data.get("Result", {}).get("Balance", 0) logger.success(f"GET USER BALANCE thành công - Số dư: {balance}") return balance else: logger.error("Lỗi GET USER BALANCE: Không nhận được dữ liệu") except Exception as e: logger.error(f"Lỗi GET USER BALANCE: {sanitize_for_log(str(e))}") return None def _perform_login_steps(self, username, password): """Thực hiện các bước đăng nhập""" steps = [ self.step1_access_website, self.step2_close_notifications, self.step3_click_login_button, lambda: self.step4_fill_login_form(username, password), self.step5_solve_captcha_auto, self.step6_submit_login ] for step in steps: if not step(): return False return True def _handle_post_login(self, username, password=""): """Xử lý sau khi đăng nhập thành công""" self.token = self._get_value_by_name(self.driver.get_cookies(), "_pat") # Bước 1: Lấy thông tin user user_info = self.step8_get_user_info() if not user_info: logger.warning("Không lấy được thông tin user") return True initial_balance = user_info.get("Balance", 0) # Ghi file số dư ban đầu self.save_balance_to_txt(username, initial_balance, password) logger.info(f"{username}|{password}|số dư ban đầu: {initial_balance}") # Bước 2: Kiểm tra và nhận lì xì nếu có red_envelope_amount = self.step9_get_red_envelope_list() if red_envelope_amount > 0: logger.success(f"Đã nhận lì xì: {red_envelope_amount}") # Bước 3: Lấy lại số dư mới sau khi nhận lì xì final_balance = self.step10_get_user_balance() if final_balance is not None: self.save_balance_to_txt(username, final_balance, password) logger.info(f"{username}|{password}|số dư sau khi nhặt: {final_balance}") else: logger.info("Không có lì xì hoặc không nhận được lì xì") # Bước 4: Lưu kết quả API self.save_api_results_to_json(username, self.api_results) # Thông báo hoàn tất và đóng trình duyệt logger.info("Hoàn tất các bước đăng nhập, đang đóng trình duyệt...") if hasattr(self, "driver"): try: self.driver.quit() logger.info("Đã đóng trình duyệt thành công.") except Exception as e: logger.error(f"Lỗi khi đóng trình duyệt: {sanitize_for_log(str(e))}") def _cleanup_resources(self, username): """Dọn dẹp tài nguyên""" try: if self.memory_manager: self.memory_manager.clear_session_data(self.session_key) self.close_browser() logger.info(f"🧹 Cleaned up resources for {sanitize_for_log(username)}") except (WebDriverException, AttributeError) as cleanup_e: logger.warning(f"⚠️ Lỗi cleanup browser: {sanitize_for_log(str(cleanup_e))}") except Exception as cleanup_e: logger.error(f"❌ Lỗi cleanup không mong muốn: {sanitize_for_log(str(cleanup_e))}") def auto_login_process(self, username, password, max_retries=2): """Quy trình đăng nhập tự động với retry""" for attempt in range(max_retries + 1): try: self.current_username = username logger.set_context(thread_id=self.thread_id, account=username, proxy=self.current_proxy_ip or "") if attempt > 0: logger.info(f"Lần thử {attempt + 1}/{max_retries + 1} - Fingerprint: {self.fingerprint}") else: logger.info(f"Bắt đầu đăng nhập - Fingerprint: {self.fingerprint}") if not self._perform_login_steps(username, password): if attempt < max_retries: logger.warning(f"Lần {attempt + 1} thất bại, thử lại...") self._cleanup_resources(username) time.sleep(uniform(2.0, 4.0)) continue return False try: WebDriverWait(self.driver, 1.5).until( lambda driver: driver.execute_script("return document.readyState") == "complete" ) except TimeoutException: time.sleep(0.3) error = self.check_login_error() if error: if "captcha_error" in error: logger.error(f"Lỗi đăng nhập: Mã xác minh không đúng") elif "credential_error" in error: logger.error(f"Lỗi đăng nhập: Sai thông tin đăng nhập") return False # Không retry cho sai thông tin elif "login_modal_still_open" in error: logger.warning(f"Modal đăng nhập vẫn mở - chờ 15s kiểm tra token...") time.sleep(15) # Chờ lâu hơn # Kiểm tra token trực tiếp token = self._get_value_by_name(self.driver.get_cookies(), "_pat") if token: logger.success(f"ĐĂNG NHẬP THÀNH CÔNG - Token: {token[:20]}...") return self._handle_post_login(username, password) else: logger.error("Chờ 15s mà vẫn không có token - thử lại") else: logger.error(f"Lỗi đăng nhập: {sanitize_for_log(str(error))}") if attempt < max_retries: logger.warning(f"Lần {attempt + 1} thất bại, thử lại...") self._cleanup_resources(username) time.sleep(uniform(3.0, 6.0)) continue else: logger.error(f"Hết lần thử ({max_retries + 1}) - Dừng") return False logger.success("ĐĂNG NHẬP THÀNH CÔNG") return self._handle_post_login(username, password) except Exception as e: logger.error(f"Lỗi quy trình đăng nhập: {sanitize_for_log(str(e))}") if attempt < max_retries: logger.warning(f"Lần {attempt + 1} gặp lỗi, thử lại...") self._cleanup_resources(username) time.sleep(uniform(3.0, 6.0)) continue return False finally: pass # Cleanup đã được xử lý trong các nơi khác return False def process_account(username, password, thread_id, website_url, api_key): """Xử lý 1 tài khoản trong thread riêng""" auto_login = AutoLogin(website_url, api_key, thread_id=thread_id) logger.info(f"🔄 [{thread_id}] Fingerprint: {auto_login.fingerprint} - Account: {sanitize_for_log(str(username))}") result = auto_login.auto_login_process(username, password) if result: logger.info(f"✅ [{thread_id}] Thành công: {sanitize_for_log(str(username))}") else: logger.error(f"❌ [{thread_id}] Thất bại: {sanitize_for_log(str(username))}") time.sleep(uniform(1.0, 3.0)) def main(): """Hàm main - xử lý liên tục toàn bộ danh sách""" config = ConfigLoader() website_url = config.get_url() api_key = config.get_api_key() accounts = config.get_all_accounts() if not website_url: logger.error("❌ Không tìm thấy URL") return if not accounts: logger.error("❌ Không tìm thấy tài khoản") return logger.info(f"🚀 Bắt đầu xử lý {len(accounts)} tài khoản") threads = [] for i, (username, password) in enumerate(accounts, 1): thread = threading.Thread( target=process_account, args=(username, password, i, website_url, api_key) ) threads.append(thread) thread.start() time.sleep(uniform(2.0, 5.0)) for thread in threads: thread.join() logger.info("🏁 Hoàn thành xử lý tất cả tài khoản") if __name__ == "__main__": main()