Перейти к основному содержимому

Python Сниппеты

Полезные сниппеты для разработки на Python.

Работа с файлами и данными

Чтение/запись файлов

import json
import csv
import pickle
from pathlib import Path

# Работа с JSON
def read_json(file_path):
"""Читает JSON файл"""
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)

def write_json(data, file_path, indent=2):
"""Записывает данные в JSON файл"""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent, ensure_ascii=False)

# Работа с CSV
def read_csv(file_path):
"""Читает CSV файл как список словарей"""
with open(file_path, 'r', encoding='utf-8') as f:
return list(csv.DictReader(f))

def write_csv(data, file_path, fieldnames=None):
"""Записывает данные в CSV файл"""
if not fieldnames and data:
fieldnames = data[0].keys()

with open(file_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)

# Работа с текстовыми файлами
def read_file_lines(file_path):
"""Читает файл построчно"""
with open(file_path, 'r', encoding='utf-8') as f:
return [line.strip() for line in f]

def write_file_lines(lines, file_path):
"""Записывает строки в файл"""
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))

# Работа с Path
def ensure_dir(directory):
"""Создает директорию если она не существует"""
Path(directory).mkdir(parents=True, exist_ok=True)

def get_files_by_extension(directory, extension):
"""Получает все файлы с определенным расширением"""
return list(Path(directory).glob(f"*.{extension}"))

Сериализация и десериализация

import pickle
import base64
import gzip

def serialize_object(obj, file_path):
"""Сериализует объект в файл"""
with open(file_path, 'wb') as f:
pickle.dump(obj, f)

def deserialize_object(file_path):
"""Десериализует объект из файла"""
with open(file_path, 'rb') as f:
return pickle.load(f)

def compress_data(data):
"""Сжимает данные с помощью gzip"""
return gzip.compress(data.encode('utf-8'))

def decompress_data(compressed_data):
"""Распаковывает данные"""
return gzip.decompress(compressed_data).decode('utf-8')

def encode_base64(data):
"""Кодирует данные в base64"""
return base64.b64encode(data.encode('utf-8')).decode('utf-8')

def decode_base64(encoded_data):
"""Декодирует данные из base64"""
return base64.b64decode(encoded_data).decode('utf-8')

Веб-разработка

HTTP запросы

import requests
import asyncio
import aiohttp
from typing import Dict, List

def make_request(url, method='GET', headers=None, data=None, timeout=30):
"""Выполняет HTTP запрос"""
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=data,
timeout=timeout
)
response.raise_for_status()
return response.json() if response.content else None
except requests.RequestException as e:
print(f"Request failed: {e}")
return None

def download_file(url, file_path, chunk_size=8192):
"""Скачивает файл по URL"""
response = requests.get(url, stream=True)
response.raise_for_status()

with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)

# Асинхронные запросы
async def fetch_url(session, url):
"""Асинхронно получает данные по URL"""
async with session.get(url) as response:
return await response.json()

async def fetch_multiple_urls(urls):
"""Асинхронно получает данные по нескольким URLs"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)

# API клиент
class APIClient:
def __init__(self, base_url, api_key=None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()

if api_key:
self.session.headers.update({'Authorization': f'Bearer {api_key}'})

def get(self, endpoint, params=None):
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return self.session.get(url, params=params).json()

def post(self, endpoint, data=None):
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return self.session.post(url, json=data).json()

def put(self, endpoint, data=None):
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return self.session.put(url, json=data).json()

def delete(self, endpoint):
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return self.session.delete(url).json()

Flask утилиты

from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta

app = Flask(__name__)

# Декоратор для проверки API ключа
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or api_key != app.config.get('API_KEY'):
return jsonify({'error': 'Invalid API key'}), 401
return f(*args, **kwargs)
return decorated_function

# Декоратор для проверки JWT токена
def require_jwt(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token missing'}), 401

try:
token = token.split(' ')[1] # Bearer TOKEN
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401

return f(*args, **kwargs)
return decorated_function

# Middleware для логирования
@app.before_request
def log_request_info():
app.logger.info(f"{request.method} {request.url} - {request.remote_addr}")

# Обработчик ошибок
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404

@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500

Работа с базами данных

SQLite

import sqlite3
from contextlib import contextmanager

class SQLiteDB:
def __init__(self, db_path):
self.db_path = db_path
self.init_db()

@contextmanager
def get_connection(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()

def init_db(self):
with self.get_connection() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()

def create_user(self, name, email):
with self.get_connection() as conn:
cursor = conn.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
conn.commit()
return cursor.lastrowid

def get_user(self, user_id):
with self.get_connection() as conn:
cursor = conn.execute('SELECT * FROM users WHERE id = ?', (user_id,))
return dict(cursor.fetchone()) if cursor.fetchone() else None

def get_all_users(self):
with self.get_connection() as conn:
cursor = conn.execute('SELECT * FROM users ORDER BY created_at DESC')
return [dict(row) for row in cursor.fetchall()]

def update_user(self, user_id, name=None, email=None):
fields = []
values = []

if name:
fields.append('name = ?')
values.append(name)
if email:
fields.append('email = ?')
values.append(email)

values.append(user_id)

with self.get_connection() as conn:
conn.execute(
f'UPDATE users SET {", ".join(fields)} WHERE id = ?',
values
)
conn.commit()

def delete_user(self, user_id):
with self.get_connection() as conn:
conn.execute('DELETE FROM users WHERE id = ?', (user_id,))
conn.commit()

PostgreSQL с psycopg2

import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager

class PostgreSQLDB:
def __init__(self, host, database, user, password, port=5432):
self.connection_params = {
'host': host,
'database': database,
'user': user,
'password': password,
'port': port
}

@contextmanager
def get_connection(self):
conn = psycopg2.connect(**self.connection_params)
try:
yield conn
finally:
conn.close()

def execute_query(self, query, params=None, fetch=False):
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)

if fetch:
return [dict(row) for row in cursor.fetchall()]

conn.commit()
return cursor.rowcount

def create_user(self, name, email):
query = 'INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id'
result = self.execute_query(query, (name, email), fetch=True)
return result[0]['id'] if result else None

def get_users(self, limit=None):
query = 'SELECT * FROM users ORDER BY created_at DESC'
if limit:
query += f' LIMIT {limit}'
return self.execute_query(query, fetch=True)

Утилиты и помощники

Декораторы

import time
import functools
from typing import Any, Callable

def timer(func):
"""Декоратор для измерения времени выполнения функции"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} executed in {end_time - start_time:.4f} seconds")
return result
return wrapper

def retry(max_attempts=3, delay=1):
"""Декоратор для повторных попыток выполнения функции"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise e
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay} seconds...")
time.sleep(delay)
return wrapper
return decorator

def cache_result(ttl_seconds=300):
"""Декоратор для кеширования результатов функции"""
def decorator(func):
cache = {}

@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(sorted(kwargs.items()))
current_time = time.time()

if key in cache:
result, timestamp = cache[key]
if current_time - timestamp < ttl_seconds:
return result

result = func(*args, **kwargs)
cache[key] = (result, current_time)
return result
return wrapper
return decorator

def validate_types(**expected_types):
"""Декоратор для валидации типов аргументов"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Получаем имена параметров
param_names = func.__code__.co_varnames[:func.__code__.co_argcount]

# Проверяем позиционные аргументы
for i, (arg, param_name) in enumerate(zip(args, param_names)):
if param_name in expected_types:
expected_type = expected_types[param_name]
if not isinstance(arg, expected_type):
raise TypeError(f"Argument {param_name} must be {expected_type.__name__}")

# Проверяем именованные аргументы
for param_name, value in kwargs.items():
if param_name in expected_types:
expected_type = expected_types[param_name]
if not isinstance(value, expected_type):
raise TypeError(f"Argument {param_name} must be {expected_type.__name__}")

return func(*args, **kwargs)
return wrapper
return decorator

Конфигурация и настройки

import os
import configparser
from dataclasses import dataclass
from typing import Optional

@dataclass
class Config:
"""Класс конфигурации приложения"""
database_url: str
secret_key: str
debug: bool = False
port: int = 5000
host: str = '0.0.0.0'

@classmethod
def from_env(cls):
"""Создает конфигурацию из переменных окружения"""
return cls(
database_url=os.getenv('DATABASE_URL', 'sqlite:///app.db'),
secret_key=os.getenv('SECRET_KEY', 'dev-secret-key'),
debug=os.getenv('DEBUG', 'False').lower() == 'true',
port=int(os.getenv('PORT', 5000)),
host=os.getenv('HOST', '0.0.0.0')
)

@classmethod
def from_file(cls, config_path):
"""Создает конфигурацию из INI файла"""
config = configparser.ConfigParser()
config.read(config_path)

return cls(
database_url=config.get('database', 'url'),
secret_key=config.get('app', 'secret_key'),
debug=config.getboolean('app', 'debug', fallback=False),
port=config.getint('app', 'port', fallback=5000),
host=config.get('app', 'host', fallback='0.0.0.0')
)

# Singleton для глобальной конфигурации
class Settings:
_instance = None
_config = None

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def load_config(self, config: Config):
self._config = config

@property
def config(self) -> Config:
if self._config is None:
self._config = Config.from_env()
return self._config

# Использование
settings = Settings()

Логирование

import logging
import logging.handlers
from pathlib import Path

def setup_logging(
name: str = 'app',
level: str = 'INFO',
log_file: Optional[str] = None,
max_bytes: int = 10485760, # 10MB
backup_count: int = 5
):
"""Настраивает логирование"""
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, level.upper()))

# Формат сообщений
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Консольный обработчик
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

# Файловый обработчик с ротацией
if log_file:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)

file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

return logger

# Контекстный менеджер для логирования
class LogContext:
def __init__(self, logger, message, level='INFO'):
self.logger = logger
self.message = message
self.level = level
self.start_time = None

def __enter__(self):
self.start_time = time.time()
self.logger.log(getattr(logging, self.level), f"Starting: {self.message}")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
if exc_type is None:
self.logger.log(getattr(logging, self.level), f"Completed: {self.message} in {duration:.2f}s")
else:
self.logger.error(f"Failed: {self.message} after {duration:.2f}s - {exc_val}")