mirror of
https://github.com/miguelgrinberg/microblog.git
synced 2025-12-08 18:02:07 +00:00
Chapter 8: Followers (v0.8)
This commit is contained in:
parent
bba4b01d3b
commit
790dc4572b
@ -51,3 +51,7 @@ class EditProfileForm(FlaskForm):
|
||||
User.username == username.data))
|
||||
if user is not None:
|
||||
raise ValidationError('Please use a different username.')
|
||||
|
||||
|
||||
class EmptyForm(FlaskForm):
|
||||
submit = SubmitField('Submit')
|
||||
|
||||
@ -8,6 +8,16 @@ from werkzeug.security import generate_password_hash, check_password_hash
|
||||
from app import db, login
|
||||
|
||||
|
||||
followers = sa.Table(
|
||||
'followers',
|
||||
db.metadata,
|
||||
sa.Column('follower_id', sa.Integer, sa.ForeignKey('user.id'),
|
||||
primary_key=True),
|
||||
sa.Column('followed_id', sa.Integer, sa.ForeignKey('user.id'),
|
||||
primary_key=True)
|
||||
)
|
||||
|
||||
|
||||
class User(UserMixin, db.Model):
|
||||
id: so.Mapped[int] = so.mapped_column(primary_key=True)
|
||||
username: so.Mapped[str] = so.mapped_column(sa.String(64), index=True,
|
||||
@ -21,6 +31,14 @@ class User(UserMixin, db.Model):
|
||||
|
||||
posts: so.WriteOnlyMapped['Post'] = so.relationship(
|
||||
back_populates='author')
|
||||
following: so.WriteOnlyMapped['User'] = so.relationship(
|
||||
secondary=followers, primaryjoin=(followers.c.follower_id == id),
|
||||
secondaryjoin=(followers.c.followed_id == id),
|
||||
back_populates='followers')
|
||||
followers: so.WriteOnlyMapped['User'] = so.relationship(
|
||||
secondary=followers, primaryjoin=(followers.c.followed_id == id),
|
||||
secondaryjoin=(followers.c.follower_id == id),
|
||||
back_populates='following')
|
||||
|
||||
def __repr__(self):
|
||||
return '<User {}>'.format(self.username)
|
||||
@ -35,6 +53,43 @@ class User(UserMixin, db.Model):
|
||||
digest = md5(self.email.lower().encode('utf-8')).hexdigest()
|
||||
return f'https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}'
|
||||
|
||||
def follow(self, user):
|
||||
if not self.is_following(user):
|
||||
self.following.add(user)
|
||||
|
||||
def unfollow(self, user):
|
||||
if self.is_following(user):
|
||||
self.following.remove(user)
|
||||
|
||||
def is_following(self, user):
|
||||
query = self.following.select().where(User.id == user.id)
|
||||
return db.session.scalar(query) is not None
|
||||
|
||||
def followers_count(self):
|
||||
query = sa.select(sa.func.count()).select_from(
|
||||
self.followers.select().subquery())
|
||||
return db.session.scalar(query)
|
||||
|
||||
def following_count(self):
|
||||
query = sa.select(sa.func.count()).select_from(
|
||||
self.following.select().subquery())
|
||||
return db.session.scalar(query)
|
||||
|
||||
def following_posts(self):
|
||||
Author = so.aliased(User)
|
||||
Follower = so.aliased(User)
|
||||
return (
|
||||
sa.select(Post)
|
||||
.join(Post.author.of_type(Author))
|
||||
.join(Author.followers.of_type(Follower), isouter=True)
|
||||
.where(sa.or_(
|
||||
Follower.id == self.id,
|
||||
Author.id == self.id,
|
||||
))
|
||||
.group_by(Post)
|
||||
.order_by(Post.timestamp.desc())
|
||||
)
|
||||
|
||||
|
||||
@login.user_loader
|
||||
def load_user(id):
|
||||
|
||||
@ -4,7 +4,7 @@ from flask import render_template, flash, redirect, url_for, request
|
||||
from flask_login import login_user, logout_user, current_user, login_required
|
||||
import sqlalchemy as sa
|
||||
from app import app, db
|
||||
from app.forms import LoginForm, RegistrationForm, EditProfileForm
|
||||
from app.forms import LoginForm, RegistrationForm, EditProfileForm, EmptyForm
|
||||
from app.models import User
|
||||
|
||||
|
||||
@ -80,7 +80,8 @@ def user(username):
|
||||
{'author': user, 'body': 'Test post #1'},
|
||||
{'author': user, 'body': 'Test post #2'}
|
||||
]
|
||||
return render_template('user.html', user=user, posts=posts)
|
||||
form = EmptyForm()
|
||||
return render_template('user.html', user=user, posts=posts, form=form)
|
||||
|
||||
|
||||
@app.route('/edit_profile', methods=['GET', 'POST'])
|
||||
@ -98,3 +99,45 @@ def edit_profile():
|
||||
form.about_me.data = current_user.about_me
|
||||
return render_template('edit_profile.html', title='Edit Profile',
|
||||
form=form)
|
||||
|
||||
|
||||
@app.route('/follow/<username>', methods=['POST'])
|
||||
@login_required
|
||||
def follow(username):
|
||||
form = EmptyForm()
|
||||
if form.validate_on_submit():
|
||||
user = db.session.scalar(
|
||||
sa.select(User).where(User.username == username))
|
||||
if user is None:
|
||||
flash(f'User {username} not found.')
|
||||
return redirect(url_for('index'))
|
||||
if user == current_user:
|
||||
flash('You cannot follow yourself!')
|
||||
return redirect(url_for('user', username=username))
|
||||
current_user.follow(user)
|
||||
db.session.commit()
|
||||
flash(f'You are following {username}!')
|
||||
return redirect(url_for('user', username=username))
|
||||
else:
|
||||
return redirect(url_for('index'))
|
||||
|
||||
|
||||
@app.route('/unfollow/<username>', methods=['POST'])
|
||||
@login_required
|
||||
def unfollow(username):
|
||||
form = EmptyForm()
|
||||
if form.validate_on_submit():
|
||||
user = db.session.scalar(
|
||||
sa.select(User).where(User.username == username))
|
||||
if user is None:
|
||||
flash(f'User {username} not found.')
|
||||
return redirect(url_for('index'))
|
||||
if user == current_user:
|
||||
flash('You cannot unfollow yourself!')
|
||||
return redirect(url_for('user', username=username))
|
||||
current_user.unfollow(user)
|
||||
db.session.commit()
|
||||
flash(f'You are not following {username}.')
|
||||
return redirect(url_for('user', username=username))
|
||||
else:
|
||||
return redirect(url_for('index'))
|
||||
|
||||
@ -8,8 +8,23 @@
|
||||
<h1>User: {{ user.username }}</h1>
|
||||
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
|
||||
{% if user.last_seen %}<p>Last seen on: {{ user.last_seen }}</p>{% endif %}
|
||||
<p>{{ user.followers_count() }} followers, {{ user.following_count() }} following.</p>
|
||||
{% if user == current_user %}
|
||||
<p><a href="{{ url_for('edit_profile') }}">Edit your profile</a></p>
|
||||
{% elif not current_user.is_following(user) %}
|
||||
<p>
|
||||
<form action="{{ url_for('follow', username=user.username) }}" method="post">
|
||||
{{ form.hidden_tag() }}
|
||||
{{ form.submit(value='Follow') }}
|
||||
</form>
|
||||
</p>
|
||||
{% else %}
|
||||
<p>
|
||||
<form action="{{ url_for('unfollow', username=user.username) }}" method="post">
|
||||
{{ form.hidden_tag() }}
|
||||
{{ form.submit(value='Unfollow') }}
|
||||
</form>
|
||||
</p>
|
||||
{% endif %}
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
34
migrations/versions/ae346256b650_followers.py
Normal file
34
migrations/versions/ae346256b650_followers.py
Normal file
@ -0,0 +1,34 @@
|
||||
"""followers
|
||||
|
||||
Revision ID: ae346256b650
|
||||
Revises: 37f06a334dbf
|
||||
Create Date: 2017-09-17 15:41:30.211082
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ae346256b650'
|
||||
down_revision = '37f06a334dbf'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('followers',
|
||||
sa.Column('follower_id', sa.Integer(), nullable=False),
|
||||
sa.Column('followed_id', sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(['followed_id'], ['user.id'], ),
|
||||
sa.ForeignKeyConstraint(['follower_id'], ['user.id'], ),
|
||||
sa.PrimaryKeyConstraint('follower_id', 'followed_id')
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('followers')
|
||||
# ### end Alembic commands ###
|
||||
100
tests.py
Executable file
100
tests.py
Executable file
@ -0,0 +1,100 @@
|
||||
import os
|
||||
os.environ['DATABASE_URL'] = 'sqlite://'
|
||||
|
||||
from datetime import datetime, timezone, timedelta
|
||||
import unittest
|
||||
from app import app, db
|
||||
from app.models import User, Post
|
||||
|
||||
|
||||
class UserModelCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.app_context = app.app_context()
|
||||
self.app_context.push()
|
||||
db.create_all()
|
||||
|
||||
def tearDown(self):
|
||||
db.session.remove()
|
||||
db.drop_all()
|
||||
self.app_context.pop()
|
||||
|
||||
def test_password_hashing(self):
|
||||
u = User(username='susan', email='susan@example.com')
|
||||
u.set_password('cat')
|
||||
self.assertFalse(u.check_password('dog'))
|
||||
self.assertTrue(u.check_password('cat'))
|
||||
|
||||
def test_avatar(self):
|
||||
u = User(username='john', email='john@example.com')
|
||||
self.assertEqual(u.avatar(128), ('https://www.gravatar.com/avatar/'
|
||||
'd4c74594d841139328695756648b6bd6'
|
||||
'?d=identicon&s=128'))
|
||||
|
||||
def test_follow(self):
|
||||
u1 = User(username='john', email='john@example.com')
|
||||
u2 = User(username='susan', email='susan@example.com')
|
||||
db.session.add(u1)
|
||||
db.session.add(u2)
|
||||
db.session.commit()
|
||||
following = db.session.scalars(u1.following.select()).all()
|
||||
followers = db.session.scalars(u2.followers.select()).all()
|
||||
self.assertEqual(following, [])
|
||||
self.assertEqual(followers, [])
|
||||
|
||||
u1.follow(u2)
|
||||
db.session.commit()
|
||||
self.assertTrue(u1.is_following(u2))
|
||||
self.assertEqual(u1.following_count(), 1)
|
||||
self.assertEqual(u2.followers_count(), 1)
|
||||
u1_following = db.session.scalars(u1.following.select()).all()
|
||||
u2_followers = db.session.scalars(u2.followers.select()).all()
|
||||
self.assertEqual(u1_following[0].username, 'susan')
|
||||
self.assertEqual(u2_followers[0].username, 'john')
|
||||
|
||||
u1.unfollow(u2)
|
||||
db.session.commit()
|
||||
self.assertFalse(u1.is_following(u2))
|
||||
self.assertEqual(u1.following_count(), 0)
|
||||
self.assertEqual(u2.followers_count(), 0)
|
||||
|
||||
def test_follow_posts(self):
|
||||
# create four users
|
||||
u1 = User(username='john', email='john@example.com')
|
||||
u2 = User(username='susan', email='susan@example.com')
|
||||
u3 = User(username='mary', email='mary@example.com')
|
||||
u4 = User(username='david', email='david@example.com')
|
||||
db.session.add_all([u1, u2, u3, u4])
|
||||
|
||||
# create four posts
|
||||
now = datetime.now(timezone.utc)
|
||||
p1 = Post(body="post from john", author=u1,
|
||||
timestamp=now + timedelta(seconds=1))
|
||||
p2 = Post(body="post from susan", author=u2,
|
||||
timestamp=now + timedelta(seconds=4))
|
||||
p3 = Post(body="post from mary", author=u3,
|
||||
timestamp=now + timedelta(seconds=3))
|
||||
p4 = Post(body="post from david", author=u4,
|
||||
timestamp=now + timedelta(seconds=2))
|
||||
db.session.add_all([p1, p2, p3, p4])
|
||||
db.session.commit()
|
||||
|
||||
# setup the followers
|
||||
u1.follow(u2) # john follows susan
|
||||
u1.follow(u4) # john follows david
|
||||
u2.follow(u3) # susan follows mary
|
||||
u3.follow(u4) # mary follows david
|
||||
db.session.commit()
|
||||
|
||||
# check the following posts of each user
|
||||
f1 = db.session.scalars(u1.following_posts()).all()
|
||||
f2 = db.session.scalars(u2.following_posts()).all()
|
||||
f3 = db.session.scalars(u3.following_posts()).all()
|
||||
f4 = db.session.scalars(u4.following_posts()).all()
|
||||
self.assertEqual(f1, [p2, p4, p1])
|
||||
self.assertEqual(f2, [p2, p3])
|
||||
self.assertEqual(f3, [p3, p4])
|
||||
self.assertEqual(f4, [p4])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=2)
|
||||
Loading…
x
Reference in New Issue
Block a user