Python Веб-фреймворки
Настройка и использование популярных Python веб-фреймворков.
FastAPI
Установка и настройка
pip install fastapi uvicorn python-multipart
Базовое приложение
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
app = FastAPI(title="API Example", version="1.0.0")
class User(BaseModel):
id: Optional[int] = None
name: str
email: str
age: int
users_db = []
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/users", response_model=List[User])
async def get_users():
return users_db
@app.post("/users", response_model=User)
async def create_user(user: User):
user.id = len(users_db) + 1
users_db.append(user)
return user
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
for user in users_db:
if user.id == user_id:
return user
raise HTTPException(status_code=404, detail="User not found")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Аутентификация JWT
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
import os
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return username
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/login")
async def login(username: str, password: str):
# Проверка пользователя в базе данных
if verify_password(password, user_password):
access_token = create_access_token(data={"sub": username})
return {"access_token": access_token, "token_type": "bearer"}
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)):
return {"message": f"Hello {current_user}"}
Django REST Framework
Установка
pip install django djangorestframework django-cors-headers
Настройка settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'corsheaders',
'api',
]
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20
}
CORS_ALLOWED_ORIGINS = [
"http://localhost:3000",
"http://127.0.0.1:3000",
]
Модели и сериализаторы
# models.py
from django.db import models
from django.contrib.auth.models import User
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
# serializers.py
from rest_framework import serializers
from .models import Post
class PostSerializer(serializers.ModelSerializer):
author = serializers.StringRelatedField()
class Meta:
model = Post
fields = ['id', 'title', 'content', 'author', 'created_at', 'updated_at']
read_only_fields = ['author', 'created_at', 'updated_at']
# views.py
from rest_framework import viewsets, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from .models import Post
from .serializers import PostSerializer
class PostViewSet(viewsets.ModelViewSet):
queryset = Post.objects.all()
serializer_class = PostSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
def perform_create(self, serializer):
serializer.save(author=self.request.user)
@action(detail=False, methods=['get'])
def my_posts(self, request):
posts = Post.objects.filter(author=request.user)
serializer = self.get_serializer(posts, many=True)
return Response(serializer.data)
Flask
Базовое приложение
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
cors = CORS(app)
jwt = JWTManager(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email
}
@app.route('/api/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([user.to_dict() for user in users])
@app.route('/api/users', methods=['POST'])
@jwt_required()
def create_user():
data = request.get_json()
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
# Проверка пользователя
access_token = create_access_token(identity=data['username'])
return jsonify(access_token=access_token)
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
Telegram Bot
Установка и настройка
pip install python-telegram-bot
Простой бот
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
TOKEN = "YOUR_BOT_TOKEN"
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
keyboard = [
[InlineKeyboardButton("Помощь", callback_data='help')],
[InlineKeyboardButton("О боте", callback_data='about')]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
'Привет! Я ваш бот-помощник.',
reply_markup=reply_markup
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
help_text = """
Доступные команды:
/start - Начать работу с ботом
/help - Показать это сообщение
/weather <город> - Узнать погоду
"""
await update.message.reply_text(help_text)
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(f"Вы сказали: {update.message.text}")
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if query.data == 'help':
await query.edit_message_text("Это справка по боту...")
elif query.data == 'about':
await query.edit_message_text("Информация о боте...")
def main():
application = Application.builder().token(TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CallbackQueryHandler(button_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
application.run_polling()
if __name__ == '__main__':
main()
Парсинг данных
BeautifulSoup + Requests
import requests
from bs4 import BeautifulSoup
import csv
import time
from urllib.parse import urljoin, urlparse
import asyncio
import aiohttp
class WebScraper:
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.session = requests.Session()
if headers:
self.session.headers.update(headers)
def get_page(self, url, retries=3):
for attempt in range(retries):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt == retries - 1:
raise e
time.sleep(2 ** attempt)
def parse_product_page(self, url):
response = self.get_page(url)
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.find('h1', class_='product-title')
price = soup.find('span', class_='price')
description = soup.find('div', class_='description')
return {
'title': title.text.strip() if title else None,
'price': price.text.strip() if price else None,
'description': description.text.strip() if description else None,
'url': url
}
def save_to_csv(self, data, filename):
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
if data:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
# Асинхронный парсер
class AsyncWebScraper:
def __init__(self, headers=None):
self.headers = headers or {}
async def fetch(self, session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def parse_urls(self, urls):
async with aiohttp.ClientSession(headers=self.headers) as session:
tasks = [self.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
Работа с базами данных
SQLAlchemy
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, default=datetime.utcnow)
author = relationship("User", back_populates="posts")
# Подключение к базе
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def create_user(username, email):
session = Session()
try:
user = User(username=username, email=email)
session.add(user)
session.commit()
return user.id
finally:
session.close()
def get_user_posts(user_id):
session = Session()
try:
user = session.query(User).filter(User.id == user_id).first()
return [{"title": post.title, "content": post.content} for post in user.posts]
finally:
session.close()
Данные и аналитика
Pandas + NumPy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
def analyze_sales_data(csv_file):
# Загрузка данных
df = pd.read_csv(csv_file)
# Очистка данных
df['date'] = pd.to_datetime(df['date'])
df = df.dropna()
# Анализ
monthly_sales = df.groupby(df['date'].dt.to_period('M'))['amount'].sum()
# Визуализация
plt.figure(figsize=(12, 6))
monthly_sales.plot(kind='bar')
plt.title('Monthly Sales')
plt.xlabel('Month')
plt.ylabel('Sales Amount')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('sales_analysis.png')
return {
'total_sales': df['amount'].sum(),
'average_sale': df['amount'].mean(),
'best_month': monthly_sales.idxmax(),
'growth_rate': calculate_growth_rate(monthly_sales)
}
def calculate_growth_rate(series):
if len(series) < 2:
return 0
return ((series.iloc[-1] - series.iloc[0]) / series.iloc[0]) * 100
Машинное обучение
Scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler
import joblib
def train_model(X, y):
# Разделение данных
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Масштабирование
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Обучение модели
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_scaled, y_train)
# Оценка
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
# Сохранение модели
joblib.dump(model, 'model.pkl')
joblib.dump(scaler, 'scaler.pkl')
return {
'accuracy': accuracy,
'report': classification_report(y_test, y_pred)
}
def predict(features):
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')
features_scaled = scaler.transform([features])
prediction = model.predict(features_scaled)
probability = model.predict_proba(features_scaled)
return prediction[0], probability[0]