Chapter 23: Application Programming Interfaces (APIs) (v0.23)

This commit is contained in:
Miguel Grinberg 2017-11-01 23:43:40 -07:00
parent b11f0b62d6
commit 1cacb96692
No known key found for this signature in database
10 changed files with 297 additions and 4 deletions

View File

@ -55,6 +55,9 @@ def create_app(config_class=Config):
from app.cli import bp as cli_bp
app.register_blueprint(cli_bp)
from app.api import bp as api_bp
app.register_blueprint(api_bp, url_prefix='/api')
if not app.debug and not app.testing:
if app.config['MAIL_SERVER']:
auth = None

5
app/api/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from flask import Blueprint
bp = Blueprint('api', __name__)
from app.api import users, errors, tokens

30
app/api/auth.py Normal file
View File

@ -0,0 +1,30 @@
import sqlalchemy as sa
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
from app import db
from app.models import User
from app.api.errors import error_response
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()
@basic_auth.verify_password
def verify_password(username, password):
user = db.session.scalar(sa.select(User).where(User.username == username))
if user and user.check_password(password):
return user
@basic_auth.error_handler
def basic_auth_error(status):
return error_response(status)
@token_auth.verify_token
def verify_token(token):
return User.check_token(token) if token else None
@token_auth.error_handler
def token_auth_error(status):
return error_response(status)

19
app/api/errors.py Normal file
View File

@ -0,0 +1,19 @@
from werkzeug.http import HTTP_STATUS_CODES
from werkzeug.exceptions import HTTPException
from app.api import bp
def error_response(status_code, message=None):
payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
if message:
payload['message'] = message
return payload, status_code
def bad_request(message):
return error_response(400, message)
@bp.errorhandler(HTTPException)
def handle_exception(e):
return error_response(e.code)

19
app/api/tokens.py Normal file
View File

@ -0,0 +1,19 @@
from app import db
from app.api import bp
from app.api.auth import basic_auth, token_auth
@bp.route('/tokens', methods=['POST'])
@basic_auth.login_required
def get_token():
token = basic_auth.current_user().get_token()
db.session.commit()
return {'token': token}
@bp.route('/tokens', methods=['DELETE'])
@token_auth.login_required
def revoke_token():
token_auth.current_user().revoke_token()
db.session.commit()
return '', 204

81
app/api/users.py Normal file
View File

@ -0,0 +1,81 @@
import sqlalchemy as sa
from flask import request, url_for, abort
from app import db
from app.models import User
from app.api import bp
from app.api.auth import token_auth
from app.api.errors import bad_request
@bp.route('/users/<int:id>', methods=['GET'])
@token_auth.login_required
def get_user(id):
return db.get_or_404(User, id).to_dict()
@bp.route('/users', methods=['GET'])
@token_auth.login_required
def get_users():
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
return User.to_collection_dict(sa.select(User), page, per_page,
'api.get_users')
@bp.route('/users/<int:id>/followers', methods=['GET'])
@token_auth.login_required
def get_followers(id):
user = db.get_or_404(User, id)
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
return User.to_collection_dict(user.followers.select(), page, per_page,
'api.get_followers', id=id)
@bp.route('/users/<int:id>/following', methods=['GET'])
@token_auth.login_required
def get_following(id):
user = db.get_or_404(User, id)
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
return User.to_collection_dict(user.following.select(), page, per_page,
'api.get_following', id=id)
@bp.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
if 'username' not in data or 'email' not in data or 'password' not in data:
return bad_request('must include username, email and password fields')
if db.session.scalar(sa.select(User).where(
User.username == data['username'])):
return bad_request('please use a different username')
if db.session.scalar(sa.select(User).where(
User.email == data['email'])):
return bad_request('please use a different email address')
user = User()
user.from_dict(data, new_user=True)
db.session.add(user)
db.session.commit()
return user.to_dict(), 201, {'Location': url_for('api.get_user',
id=user.id)}
@bp.route('/users/<int:id>', methods=['PUT'])
@token_auth.login_required
def update_user(id):
if token_auth.current_user().id != id:
abort(403)
user = db.get_or_404(User, id)
data = request.get_json()
if 'username' in data and data['username'] != user.username and \
db.session.scalar(sa.select(User).where(
User.username == data['username'])):
return bad_request('please use a different username')
if 'email' in data and data['email'] != user.email and \
db.session.scalar(sa.select(User).where(
User.email == data['email'])):
return bad_request('please use a different email address')
user.from_dict(data, new_user=False)
db.session.commit()
return user.to_dict()

View File

@ -1,14 +1,24 @@
from flask import render_template
from flask import render_template, request
from app import db
from app.errors import bp
from app.api.errors import error_response as api_error_response
def wants_json_response():
return request.accept_mimetypes['application/json'] >= \
request.accept_mimetypes['text/html']
@bp.app_errorhandler(404)
def not_found_error(error):
if wants_json_response():
return api_error_response(404)
return render_template('errors/404.html'), 404
@bp.app_errorhandler(500)
def internal_error(error):
db.session.rollback()
if wants_json_response():
return api_error_response(500)
return render_template('errors/500.html'), 500

View File

@ -1,11 +1,12 @@
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from hashlib import md5
import json
import secrets
from time import time
from typing import Optional
import sqlalchemy as sa
import sqlalchemy.orm as so
from flask import current_app
from flask import current_app, url_for
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
@ -59,6 +60,31 @@ db.event.listen(db.session, 'before_commit', SearchableMixin.before_commit)
db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit)
class PaginatedAPIMixin(object):
@staticmethod
def to_collection_dict(query, page, per_page, endpoint, **kwargs):
resources = db.paginate(query, page=page, per_page=per_page,
error_out=False)
data = {
'items': [item.to_dict() for item in resources.items],
'_meta': {
'page': page,
'per_page': per_page,
'total_pages': resources.pages,
'total_items': resources.total
},
'_links': {
'self': url_for(endpoint, page=page, per_page=per_page,
**kwargs),
'next': url_for(endpoint, page=page + 1, per_page=per_page,
**kwargs) if resources.has_next else None,
'prev': url_for(endpoint, page=page - 1, per_page=per_page,
**kwargs) if resources.has_prev else None
}
}
return data
followers = sa.Table(
'followers',
db.metadata,
@ -69,7 +95,7 @@ followers = sa.Table(
)
class User(UserMixin, db.Model):
class User(PaginatedAPIMixin, UserMixin, db.Model):
id: so.Mapped[int] = so.mapped_column(primary_key=True)
username: so.Mapped[str] = so.mapped_column(sa.String(64), index=True,
unique=True)
@ -80,6 +106,9 @@ class User(UserMixin, db.Model):
last_seen: so.Mapped[Optional[datetime]] = so.mapped_column(
default=lambda: datetime.now(timezone.utc))
last_message_read_time: so.Mapped[Optional[datetime]]
token: so.Mapped[Optional[str]] = so.mapped_column(
sa.String(32), index=True, unique=True)
token_expiration: so.Mapped[Optional[datetime]]
posts: so.WriteOnlyMapped['Post'] = so.relationship(
back_populates='author')
@ -194,6 +223,61 @@ class User(UserMixin, db.Model):
Task.complete == False)
return db.session.scalar(query)
def posts_count(self):
query = sa.select(sa.func.count()).select_from(
self.posts.select().subquery())
return db.session.scalar(query)
def to_dict(self, include_email=False):
data = {
'id': self.id,
'username': self.username,
'last_seen': self.last_seen.replace(
tzinfo=timezone.utc).isoformat(),
'about_me': self.about_me,
'post_count': self.posts_count(),
'follower_count': self.followers_count(),
'following_count': self.following_count(),
'_links': {
'self': url_for('api.get_user', id=self.id),
'followers': url_for('api.get_followers', id=self.id),
'following': url_for('api.get_following', id=self.id),
'avatar': self.avatar(128)
}
}
if include_email:
data['email'] = self.email
return data
def from_dict(self, data, new_user=False):
for field in ['username', 'email', 'about_me']:
if field in data:
setattr(self, field, data[field])
if new_user and 'password' in data:
self.set_password(data['password'])
def get_token(self, expires_in=3600):
now = datetime.now(timezone.utc)
if self.token and self.token_expiration.replace(
tzinfo=timezone.utc) > now + timedelta(seconds=60):
return self.token
self.token = secrets.token_hex(16)
self.token_expiration = now + timedelta(seconds=expires_in)
db.session.add(self)
return self.token
def revoke_token(self):
self.token_expiration = datetime.now(timezone.utc) - timedelta(
seconds=1)
@staticmethod
def check_token(token):
user = db.session.scalar(sa.select(User).where(User.token == token))
if user is None or user.token_expiration.replace(
tzinfo=timezone.utc) < datetime.now(timezone.utc):
return None
return user
@login.user_loader
def load_user(id):

View File

@ -0,0 +1,32 @@
"""user tokens
Revision ID: 834b1a697901
Revises: c81bac34faab
Create Date: 2017-11-05 18:41:07.996137
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '834b1a697901'
down_revision = 'c81bac34faab'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user', sa.Column('token', sa.String(length=32), nullable=True))
op.add_column('user', sa.Column('token_expiration', sa.DateTime(), nullable=True))
op.create_index(op.f('ix_user_token'), 'user', ['token'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_user_token'), table_name='user')
op.drop_column('user', 'token_expiration')
op.drop_column('user', 'token')
# ### end Alembic commands ###

View File

@ -7,12 +7,14 @@ blinker==1.7.0
certifi==2023.11.17
charset-normalizer==3.3.2
click==8.1.7
defusedxml==0.7.1
dnspython==2.4.2
elastic-transport==8.10.0
elasticsearch==8.11.0
email-validator==2.1.0.post1
Flask==3.0.0
flask-babel==4.0.0
Flask-HTTPAuth==4.8.0
Flask-Login==0.6.3
Flask-Mail==0.9.1
Flask-Migrate==4.0.5
@ -21,19 +23,27 @@ Flask-SQLAlchemy==3.1.1
Flask-WTF==1.2.1
greenlet==3.0.1
gunicorn==21.2.0
httpie==3.2.2
idna==3.4
itsdangerous==2.1.2
Jinja2==3.1.2
langdetect==1.0.9
Mako==1.3.0
markdown-it-py==3.0.0
MarkupSafe==2.1.3
mdurl==0.1.2
multidict==6.0.4
packaging==23.2
psycopg2-binary==2.9.9
Pygments==2.17.1
PyJWT==2.8.0
PySocks==1.7.1
python-dotenv==1.0.0
pytz==2023.3.post1
redis==5.0.1
requests==2.31.0
requests-toolbelt==1.0.0
rich==13.7.0
rq==1.15.1
setuptools==68.2.2
six==1.16.0