From b11f0b62d6a4709ac349105a23c167fd778fd64e Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 19 Nov 2017 23:57:09 -0800 Subject: [PATCH] Chapter 22: Background Jobs (v0.22) --- Procfile | 1 + app/__init__.py | 4 ++ app/email.py | 13 +++-- app/main/routes.py | 11 ++++ app/models.py | 41 +++++++++++++++ app/tasks.py | 56 +++++++++++++++++++++ app/templates/base.html | 31 +++++++++++- app/templates/email/export_posts.html | 4 ++ app/templates/email/export_posts.txt | 7 +++ app/templates/user.html | 3 ++ app/translations/es/LC_MESSAGES/messages.po | 28 ++++++++--- config.py | 1 + deployment/supervisor/microblog-tasks.conf | 9 ++++ microblog.py | 4 +- migrations/versions/c81bac34faab_tasks.py | 38 ++++++++++++++ requirements.txt | 4 ++ 16 files changed, 240 insertions(+), 15 deletions(-) create mode 100644 app/tasks.py create mode 100644 app/templates/email/export_posts.html create mode 100644 app/templates/email/export_posts.txt create mode 100644 deployment/supervisor/microblog-tasks.conf create mode 100644 migrations/versions/c81bac34faab_tasks.py diff --git a/Procfile b/Procfile index 216c639..62bc894 100644 --- a/Procfile +++ b/Procfile @@ -1 +1,2 @@ web: flask db upgrade; flask translate compile; gunicorn microblog:app +worker: rq worker microblog-tasks diff --git a/app/__init__.py b/app/__init__.py index de8d6a0..235daf3 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -9,6 +9,8 @@ from flask_mail import Mail from flask_moment import Moment from flask_babel import Babel, lazy_gettext as _l from elasticsearch import Elasticsearch +from redis import Redis +import rq from config import Config @@ -38,6 +40,8 @@ def create_app(config_class=Config): babel.init_app(app, locale_selector=get_locale) app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \ if app.config['ELASTICSEARCH_URL'] else None + app.redis = Redis.from_url(app.config['REDIS_URL']) + app.task_queue = rq.Queue('microblog-tasks', connection=app.redis) from app.errors import bp as errors_bp app.register_blueprint(errors_bp) diff --git a/app/email.py b/app/email.py index ee23da8..2375cea 100644 --- a/app/email.py +++ b/app/email.py @@ -9,9 +9,16 @@ def send_async_email(app, msg): mail.send(msg) -def send_email(subject, sender, recipients, text_body, html_body): +def send_email(subject, sender, recipients, text_body, html_body, + attachments=None, sync=False): msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body - Thread(target=send_async_email, - args=(current_app._get_current_object(), msg)).start() + if attachments: + for attachment in attachments: + msg.attach(*attachment) + if sync: + mail.send(msg) + else: + Thread(target=send_async_email, + args=(current_app._get_current_object(), msg)).start() diff --git a/app/main/routes.py b/app/main/routes.py index c06f42f..103bbc7 100644 --- a/app/main/routes.py +++ b/app/main/routes.py @@ -216,6 +216,17 @@ def messages(): next_url=next_url, prev_url=prev_url) +@bp.route('/export_posts') +@login_required +def export_posts(): + if current_user.get_task_in_progress('export_posts'): + flash(_('An export task is currently in progress')) + else: + current_user.launch_task('export_posts', _('Exporting posts...')) + db.session.commit() + return redirect(url_for('main.user', username=current_user.username)) + + @bp.route('/notifications') @login_required def notifications(): diff --git a/app/models.py b/app/models.py index 0d10d7c..a3b4d53 100644 --- a/app/models.py +++ b/app/models.py @@ -9,6 +9,8 @@ from flask import current_app from flask_login import UserMixin from werkzeug.security import generate_password_hash, check_password_hash import jwt +import redis +import rq from app import db, login from app.search import add_to_index, remove_from_index, query_index @@ -95,6 +97,7 @@ class User(UserMixin, db.Model): foreign_keys='Message.recipient_id', back_populates='recipient') notifications: so.WriteOnlyMapped['Notification'] = so.relationship( back_populates='user') + tasks: so.WriteOnlyMapped['Task'] = so.relationship(back_populates='user') def __repr__(self): return ''.format(self.username) @@ -174,6 +177,23 @@ class User(UserMixin, db.Model): db.session.add(n) return n + def launch_task(self, name, description, *args, **kwargs): + rq_job = current_app.task_queue.enqueue(f'app.tasks.{name}', self.id, + *args, **kwargs) + task = Task(id=rq_job.get_id(), name=name, description=description, + user=self) + db.session.add(task) + return task + + def get_tasks_in_progress(self): + query = self.tasks.select().where(Task.complete == False) + return db.session.scalars(query) + + def get_task_in_progress(self, name): + query = self.tasks.select().where(Task.name == name, + Task.complete == False) + return db.session.scalar(query) + @login.user_loader def load_user(id): @@ -229,3 +249,24 @@ class Notification(db.Model): def get_data(self): return json.loads(str(self.payload_json)) + + +class Task(db.Model): + id: so.Mapped[str] = so.mapped_column(sa.String(36), primary_key=True) + name: so.Mapped[str] = so.mapped_column(sa.String(128), index=True) + description: so.Mapped[Optional[str]] = so.mapped_column(sa.String(128)) + user_id: so.Mapped[int] = so.mapped_column(sa.ForeignKey(User.id)) + complete: so.Mapped[bool] = so.mapped_column(default=False) + + user: so.Mapped[User] = so.relationship(back_populates='tasks') + + def get_rq_job(self): + try: + rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis) + except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError): + return None + return rq_job + + def get_progress(self): + job = self.get_rq_job() + return job.meta.get('progress', 0) if job is not None else 100 diff --git a/app/tasks.py b/app/tasks.py new file mode 100644 index 0000000..356079e --- /dev/null +++ b/app/tasks.py @@ -0,0 +1,56 @@ +import json +import sys +import time +import sqlalchemy as sa +from flask import render_template +from rq import get_current_job +from app import create_app, db +from app.models import User, Post, Task +from app.email import send_email + +app = create_app() +app.app_context().push() + + +def _set_task_progress(progress): + job = get_current_job() + if job: + job.meta['progress'] = progress + job.save_meta() + task = db.session.get(Task, job.get_id()) + task.user.add_notification('task_progress', {'task_id': job.get_id(), + 'progress': progress}) + if progress >= 100: + task.complete = True + db.session.commit() + + +def export_posts(user_id): + try: + user = db.session.get(User, user_id) + _set_task_progress(0) + data = [] + i = 0 + total_posts = db.session.scalar(sa.select(sa.func.count()).select_from( + user.posts.select().subquery())) + for post in db.session.scalars(user.posts.select().order_by( + Post.timestamp.asc())): + data.append({'body': post.body, + 'timestamp': post.timestamp.isoformat() + 'Z'}) + time.sleep(5) + i += 1 + _set_task_progress(100 * i // total_posts) + + send_email( + '[Microblog] Your blog posts', + sender=app.config['ADMINS'][0], recipients=[user.email], + text_body=render_template('email/export_posts.txt', user=user), + html_body=render_template('email/export_posts.html', user=user), + attachments=[('posts.json', 'application/json', + json.dumps({'posts': data}, indent=4))], + sync=True) + except Exception: + _set_task_progress(100) + app.logger.error('Unhandled exception', exc_info=sys.exc_info()) + finally: + _set_task_progress(100) diff --git a/app/templates/base.html b/app/templates/base.html index aa50e1d..4eca8da 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -65,6 +65,19 @@
+ {% if current_user.is_authenticated %} + {% with tasks = current_user.get_tasks_in_progress() %} + {% if tasks %} + {% for task in tasks %} + + {% endfor %} + {% endif %} + {% endwith %} + {% endif %} + {% with messages = get_flashed_messages() %} {% if messages %} {% for message in messages %} @@ -134,6 +147,13 @@ count.style.visibility = n ? 'visible' : 'hidden'; } + function set_task_progress(task_id, progress) { + const progressElement = document.getElementById(task_id + '-progress'); + if (progressElement) { + progressElement.innerText = progress; + } + } + {% if current_user.is_authenticated %} function initialize_notifications() { let since = 0; @@ -141,8 +161,15 @@ const response = await fetch('{{ url_for('main.notifications') }}?since=' + since); const notifications = await response.json(); for (let i = 0; i < notifications.length; i++) { - if (notifications[i].name == 'unread_message_count') - set_message_count(notifications[i].data); + switch (notifications[i].name) { + case 'unread_message_count': + set_message_count(notifications[i].data); + break; + case 'task_progress': + set_task_progress(notifications[i].data.task_id, + notifications[i].data.progress); + break; + } since = notifications[i].timestamp; } }, 10000); diff --git a/app/templates/email/export_posts.html b/app/templates/email/export_posts.html new file mode 100644 index 0000000..f98383a --- /dev/null +++ b/app/templates/email/export_posts.html @@ -0,0 +1,4 @@ +

Dear {{ user.username }},

+

Please find attached the archive of your posts that you requested.

+

Sincerely,

+

The Microblog Team

diff --git a/app/templates/email/export_posts.txt b/app/templates/email/export_posts.txt new file mode 100644 index 0000000..81c9f7a --- /dev/null +++ b/app/templates/email/export_posts.txt @@ -0,0 +1,7 @@ +Dear {{ user.username }}, + +Please find attached the archive of your posts that you requested. + +Sincerely, + +The Microblog Team diff --git a/app/templates/user.html b/app/templates/user.html index 7060efe..c973426 100644 --- a/app/templates/user.html +++ b/app/templates/user.html @@ -13,6 +13,9 @@

{{ _('%(count)d followers', count=user.followers_count()) }}, {{ _('%(count)d following', count=user.following_count()) }}

{% if user == current_user %}

{{ _('Edit your profile') }}

+ {% if not current_user.get_task_in_progress('export_posts') %} +

{{ _('Export your posts') }}

+ {% endif %} {% elif not current_user.is_following(user) %}

diff --git a/app/translations/es/LC_MESSAGES/messages.po b/app/translations/es/LC_MESSAGES/messages.po index dac4264..ad0e8ed 100644 --- a/app/translations/es/LC_MESSAGES/messages.po +++ b/app/translations/es/LC_MESSAGES/messages.po @@ -7,7 +7,7 @@ msgid "" msgstr "" "Project-Id-Version: PROJECT VERSION\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2017-11-25 18:26-0800\n" +"POT-Creation-Date: 2017-11-25 18:27-0800\n" "PO-Revision-Date: 2017-09-29 23:25-0700\n" "Last-Translator: FULL NAME \n" "Language: es\n" @@ -18,7 +18,7 @@ msgstr "" "Content-Transfer-Encoding: 8bit\n" "Generated-By: Babel 2.5.1\n" -#: app/__init__.py:18 +#: app/__init__.py:20 msgid "Please log in to access this page." msgstr "Por favor ingrese para acceder a esta página." @@ -153,6 +153,14 @@ msgstr "Tu mensaje ha sido enviado." msgid "Send Message" msgstr "Enviar Mensaje" +#: app/main/routes.py:197 +msgid "An export task is currently in progress" +msgstr "Una tarea de exportación esta en progreso" + +#: app/main/routes.py:199 +msgid "Exporting posts..." +msgstr "Exportando artículos..." + #: app/templates/_post.html:16 #, python-format msgid "%(username)s said %(when)s" @@ -190,7 +198,7 @@ msgstr "Perfil" msgid "Logout" msgstr "Salir" -#: app/templates/base.html:83 +#: app/templates/base.html:95 msgid "Error: Could not contact server." msgstr "Error: el servidor no pudo ser contactado." @@ -199,11 +207,11 @@ msgstr "Error: el servidor no pudo ser contactado." msgid "Hi, %(username)s!" msgstr "¡Hola, %(username)s!" -#: app/templates/index.html:17 app/templates/user.html:34 +#: app/templates/index.html:17 app/templates/user.html:37 msgid "Newer posts" msgstr "Artículos siguientes" -#: app/templates/index.html:22 app/templates/user.html:39 +#: app/templates/index.html:22 app/templates/user.html:42 msgid "Older posts" msgstr "Artículos previos" @@ -254,15 +262,19 @@ msgstr "siguiendo a %(count)d" msgid "Edit your profile" msgstr "Editar tu perfil" -#: app/templates/user.html:17 app/templates/user_popup.html:14 +#: app/templates/user.html:17 +msgid "Export your posts" +msgstr "Exportar tus artículos" + +#: app/templates/user.html:20 app/templates/user_popup.html:14 msgid "Follow" msgstr "Seguir" -#: app/templates/user.html:19 app/templates/user_popup.html:16 +#: app/templates/user.html:22 app/templates/user_popup.html:16 msgid "Unfollow" msgstr "Dejar de seguir" -#: app/templates/user.html:22 +#: app/templates/user.html:25 msgid "Send private message" msgstr "Enviar mensaje privado" diff --git a/config.py b/config.py index 00e90de..7e50c9e 100644 --- a/config.py +++ b/config.py @@ -21,4 +21,5 @@ class Config: LANGUAGES = ['en', 'es'] MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY') ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL') + REDIS_URL = os.environ.get('REDIS_URL') or 'redis://' POSTS_PER_PAGE = 25 diff --git a/deployment/supervisor/microblog-tasks.conf b/deployment/supervisor/microblog-tasks.conf new file mode 100644 index 0000000..d47b6da --- /dev/null +++ b/deployment/supervisor/microblog-tasks.conf @@ -0,0 +1,9 @@ +[program:microblog-tasks] +command=/home/ubuntu/microblog/venv/bin/rq worker microblog-tasks +numprocs=1 +directory=/home/ubuntu/microblog +user=ubuntu +autostart=true +autorestart=true +stopasgroup=true +killasgroup=true diff --git a/microblog.py b/microblog.py index 5f4baa0..458e932 100644 --- a/microblog.py +++ b/microblog.py @@ -1,7 +1,7 @@ import sqlalchemy as sa import sqlalchemy.orm as so from app import create_app, db -from app.models import User, Post, Message, Notification +from app.models import User, Post, Message, Notification, Task app = create_app() @@ -9,4 +9,4 @@ app = create_app() @app.shell_context_processor def make_shell_context(): return {'sa': sa, 'so': so, 'db': db, 'User': User, 'Post': Post, - 'Message': Message, 'Notification': Notification} + 'Message': Message, 'Notification': Notification, 'Task': Task} diff --git a/migrations/versions/c81bac34faab_tasks.py b/migrations/versions/c81bac34faab_tasks.py new file mode 100644 index 0000000..3b96cf7 --- /dev/null +++ b/migrations/versions/c81bac34faab_tasks.py @@ -0,0 +1,38 @@ +"""tasks + +Revision ID: c81bac34faab +Revises: f7ac3d27bb1d +Create Date: 2017-11-23 10:56:49.599779 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c81bac34faab' +down_revision = 'f7ac3d27bb1d' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('task', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=128), nullable=False), + sa.Column('description', sa.String(length=128), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('complete', sa.Boolean(), nullable=False), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_task_name'), table_name='task') + op.drop_table('task') + # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index 748314d..dfa38cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,6 +20,7 @@ Flask-Moment==1.0.5 Flask-SQLAlchemy==3.1.1 Flask-WTF==1.2.1 greenlet==3.0.1 +gunicorn==21.2.0 idna==3.4 itsdangerous==2.1.2 Jinja2==3.1.2 @@ -27,10 +28,13 @@ langdetect==1.0.9 Mako==1.3.0 MarkupSafe==2.1.3 packaging==23.2 +psycopg2-binary==2.9.9 PyJWT==2.8.0 python-dotenv==1.0.0 pytz==2023.3.post1 +redis==5.0.1 requests==2.31.0 +rq==1.15.1 setuptools==68.2.2 six==1.16.0 SQLAlchemy==2.0.23