DVDFabExtendedLicense/docs/server.py

166 lines
5.2 KiB
Python
Raw Normal View History

2024-09-30 09:51:07 +00:00
import re
import secrets
import string
import requests
import xmltodict
from flask import Flask, request, Response
from requests_toolbelt import MultipartEncoder
# utils.py
def parse_boundary(data: str) -> dict:
"""
Parse the multipart form data to extract fields.
Parameters:
- data (str): The raw multipart form data as a string.
Returns:
- dict: A dictionary with form field names as keys and field values as values.
"""
fields = {}
# Split parts using the boundary (separator) and process each part
items = data.split('-' * 26)[1:]
for item in items:
lines = item.splitlines()
if len(lines) < 3:
continue
# Extract the field name from the Content-Disposition header
key = re.match(r'^Content-Disposition:\s*form-data;\s*name="([^"]+)"\s*$', lines[1]).group(1)
# Join the remaining lines as the field value
fields[key] = '\n'.join(lines[3:])
return fields
# session.py
class Session:
DOMAINS = ['hotmail.com', 'gmail.com', 'yahoo.com', 'outlook.com', 'protonmail.com', 'yandex.com']
CHARACTERS = string.ascii_lowercase + string.digits + '.-'
def __init__(self):
self.email = None
self.machine_id = None
self.usage = 0
self.__update()
def __update(self) -> None:
"""
Update the session with a new random email and machine ID.
"""
length = secrets.choice(range(5, 15))
self.email = '{local}@{domain}'.format(
local=''.join(secrets.choice(self.CHARACTERS) for _ in range(length)),
domain=secrets.choice(Session.DOMAINS)
)
self.machine_id = ':'.join([
'-'.join(f'{b:02x}' for b in secrets.token_bytes(6)),
'-'.join(f'{b:02x}' for b in secrets.token_bytes(6))
])
print(f'Email: {self.email}')
print(f'Machine ID: {self.machine_id}')
def __repr__(self) -> str:
return '{name}({items})'.format(
name=self.__class__.__name__,
items=', '.join([f'{k}={repr(v)}' for k, v in self.__dict__.items()])
)
def patch_boundary(self, data: dict) -> dict:
"""
Modify specific fields in the data dictionary based on predefined rules.
Parameters:
- data (dict): A dictionary containing form field names and values.
Returns:
- dict: The modified dictionary with updated field values.
"""
if self.usage == 3:
self.usage = 0
self.__update()
# Update fields based on rules
for key, value in data.items():
if key in ('TK', 'PW', 'D', 'F') and re.match(r'[0-9a-f]{32}', value):
data[key] = value # '' # Replace token
if re.match(r'^([0-9a-f]{2}-){5}[0-9a-f]{2}(:([0-9a-f]{2}-){5}[0-9a-f]{2})?', value):
data[key] = self.machine_id # Replace MAC
elif re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', value):
data[key] = self.email # Replace email
elif re.match(r'^365$', value):
data[key] = 'trial' # Replace subscription
self.usage += 1
return data
# app.py
session = Session()
app = Flask(__name__)
app.config.update(
DEBUG=True,
SECRET_KEY=secrets.token_hex(16),
ALLOWED_HOSTS=['127.0.0.1', 'localhost']
)
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['POST'])
def catch_all(path: str) -> Response:
global session
"""
Handle all POST requests, modify the data, and forward it to the same URL over HTTPS.
Parameters:
- path (str): The path part of the URL (used for routing).
Returns:
- Response: A Flask Response object with the status and content of the forwarded request.
"""
is_auth = re.match(r'^auth/trial_disc\.php', path)
headers = dict(request.headers)
del headers['Connection']
del headers['Content-Length']
body = request.get_data()
if not is_auth:
boundary = parse_boundary(body.decode('utf-8'))
patched = session.patch_boundary(boundary)
mp_encoder = MultipartEncoder(patched)
headers['Content-Type'] = mp_encoder.content_type
body = mp_encoder.read()
# Forward the request to the same URL over HTTPS
r = requests.request(
method=request.method,
url=request.url.replace('http://', 'https://'),
params=request.args.to_dict(),
data=body,
headers=headers,
cookies=request.cookies
)
content = r.content
if is_auth:
data = xmltodict.parse(content)
if data.get('TrialOpenDiscInfo', {}).get('@MacID'):
data['TrialOpenDiscInfo']['@MacID'] = session.machine_id
if data.get('TrialOpenDiscInfo', {}).get('DiscInfo'):
del data['TrialOpenDiscInfo']['DiscInfo']
content = xmltodict.unparse(data, pretty=False, full_document=False)
# Return the response from the forwarded request
return Response(status=r.status_code, response=content)
# Run the Flask app
app.add_url_rule('/', 'catch_all', catch_all, defaults={'path': ''})
app.add_url_rule('/<path:path>', 'catch_all', catch_all, defaults={'path': ''})
app.run(host='127.0.0.1', port=5000)