Перейти к основному содержимому

Python Веб-фреймворки

Настройка и использование популярных Python веб-фреймворков.

FastAPI

Установка и настройка

pip install fastapi uvicorn python-multipart

Базовое приложение

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import uvicorn

app = FastAPI(title="API Example", version="1.0.0")

class User(BaseModel):
id: Optional[int] = None
name: str
email: str
age: int

users_db = []

@app.get("/")
async def root():
return {"message": "Hello World"}

@app.get("/users", response_model=List[User])
async def get_users():
return users_db

@app.post("/users", response_model=User)
async def create_user(user: User):
user.id = len(users_db) + 1
users_db.append(user)
return user

@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
for user in users_db:
if user.id == user_id:
return user
raise HTTPException(status_code=404, detail="User not found")

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

Аутентификация JWT

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
import os

SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()

def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return username
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/login")
async def login(username: str, password: str):
# Проверка пользователя в базе данных
if verify_password(password, user_password):
access_token = create_access_token(data={"sub": username})
return {"access_token": access_token, "token_type": "bearer"}
raise HTTPException(status_code=401, detail="Invalid credentials")

@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)):
return {"message": f"Hello {current_user}"}

Django REST Framework

Установка

pip install django djangorestframework django-cors-headers

Настройка settings.py

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'corsheaders',
'api',
]

MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20
}

CORS_ALLOWED_ORIGINS = [
"http://localhost:3000",
"http://127.0.0.1:3000",
]

Модели и сериализаторы

# models.py
from django.db import models
from django.contrib.auth.models import User

class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class Meta:
ordering = ['-created_at']

# serializers.py
from rest_framework import serializers
from .models import Post

class PostSerializer(serializers.ModelSerializer):
author = serializers.StringRelatedField()

class Meta:
model = Post
fields = ['id', 'title', 'content', 'author', 'created_at', 'updated_at']
read_only_fields = ['author', 'created_at', 'updated_at']

# views.py
from rest_framework import viewsets, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from .models import Post
from .serializers import PostSerializer

class PostViewSet(viewsets.ModelViewSet):
queryset = Post.objects.all()
serializer_class = PostSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]

def perform_create(self, serializer):
serializer.save(author=self.request.user)

@action(detail=False, methods=['get'])
def my_posts(self, request):
posts = Post.objects.filter(author=request.user)
serializer = self.get_serializer(posts, many=True)
return Response(serializer.data)

Flask

Базовое приложение

from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
cors = CORS(app)
jwt = JWTManager(app)

class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)

def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email
}

@app.route('/api/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([user.to_dict() for user in users])

@app.route('/api/users', methods=['POST'])
@jwt_required()
def create_user():
data = request.get_json()
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201

@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
# Проверка пользователя
access_token = create_access_token(identity=data['username'])
return jsonify(access_token=access_token)

if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)

Telegram Bot

Установка и настройка

pip install python-telegram-bot

Простой бот

import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler

# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)

TOKEN = "YOUR_BOT_TOKEN"

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
keyboard = [
[InlineKeyboardButton("Помощь", callback_data='help')],
[InlineKeyboardButton("О боте", callback_data='about')]
]
reply_markup = InlineKeyboardMarkup(keyboard)

await update.message.reply_text(
'Привет! Я ваш бот-помощник.',
reply_markup=reply_markup
)

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
help_text = """
Доступные команды:
/start - Начать работу с ботом
/help - Показать это сообщение
/weather <город> - Узнать погоду
"""
await update.message.reply_text(help_text)

async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(f"Вы сказали: {update.message.text}")

async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()

if query.data == 'help':
await query.edit_message_text("Это справка по боту...")
elif query.data == 'about':
await query.edit_message_text("Информация о боте...")

def main():
application = Application.builder().token(TOKEN).build()

application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CallbackQueryHandler(button_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))

application.run_polling()

if __name__ == '__main__':
main()

Парсинг данных

BeautifulSoup + Requests

import requests
from bs4 import BeautifulSoup
import csv
import time
from urllib.parse import urljoin, urlparse
import asyncio
import aiohttp

class WebScraper:
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.session = requests.Session()
if headers:
self.session.headers.update(headers)

def get_page(self, url, retries=3):
for attempt in range(retries):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt == retries - 1:
raise e
time.sleep(2 ** attempt)

def parse_product_page(self, url):
response = self.get_page(url)
soup = BeautifulSoup(response.content, 'html.parser')

title = soup.find('h1', class_='product-title')
price = soup.find('span', class_='price')
description = soup.find('div', class_='description')

return {
'title': title.text.strip() if title else None,
'price': price.text.strip() if price else None,
'description': description.text.strip() if description else None,
'url': url
}

def save_to_csv(self, data, filename):
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
if data:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)

# Асинхронный парсер
class AsyncWebScraper:
def __init__(self, headers=None):
self.headers = headers or {}

async def fetch(self, session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None

async def parse_urls(self, urls):
async with aiohttp.ClientSession(headers=self.headers) as session:
tasks = [self.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results

Работа с базами данных

SQLAlchemy

from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

Base = declarative_base()

class User(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)

posts = relationship("Post", back_populates="author")

class Post(Base):
__tablename__ = 'posts'

id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, default=datetime.utcnow)

author = relationship("User", back_populates="posts")

# Подключение к базе
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

def create_user(username, email):
session = Session()
try:
user = User(username=username, email=email)
session.add(user)
session.commit()
return user.id
finally:
session.close()

def get_user_posts(user_id):
session = Session()
try:
user = session.query(User).filter(User.id == user_id).first()
return [{"title": post.title, "content": post.content} for post in user.posts]
finally:
session.close()

Данные и аналитика

Pandas + NumPy

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

def analyze_sales_data(csv_file):
# Загрузка данных
df = pd.read_csv(csv_file)

# Очистка данных
df['date'] = pd.to_datetime(df['date'])
df = df.dropna()

# Анализ
monthly_sales = df.groupby(df['date'].dt.to_period('M'))['amount'].sum()

# Визуализация
plt.figure(figsize=(12, 6))
monthly_sales.plot(kind='bar')
plt.title('Monthly Sales')
plt.xlabel('Month')
plt.ylabel('Sales Amount')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('sales_analysis.png')

return {
'total_sales': df['amount'].sum(),
'average_sale': df['amount'].mean(),
'best_month': monthly_sales.idxmax(),
'growth_rate': calculate_growth_rate(monthly_sales)
}

def calculate_growth_rate(series):
if len(series) < 2:
return 0
return ((series.iloc[-1] - series.iloc[0]) / series.iloc[0]) * 100

Машинное обучение

Scikit-learn

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler
import joblib

def train_model(X, y):
# Разделение данных
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Масштабирование
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Обучение модели
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_scaled, y_train)

# Оценка
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)

# Сохранение модели
joblib.dump(model, 'model.pkl')
joblib.dump(scaler, 'scaler.pkl')

return {
'accuracy': accuracy,
'report': classification_report(y_test, y_pred)
}

def predict(features):
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')

features_scaled = scaler.transform([features])
prediction = model.predict(features_scaled)
probability = model.predict_proba(features_scaled)

return prediction[0], probability[0]