news 2026/5/7 19:38:21

社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

我将为您创建一个完整的社区公告推送小程序。以下是完整的项目结构:

项目结构

community_announcement_system/

├── main.py # 主程序入口

├── models.py # 数据模型模块

├── announcement_manager.py # 公告管理核心模块

├── user_manager.py # 用户管理模块

├── notification_service.py # 推送通知服务模块

├── database.py # 数据库操作模块

├── utils.py # 工具函数模块

├── config.py # 配置文件

├── templates.py # HTML模板模块

├── static/

│ ├── style.css # 样式文件

│ └── script.js # JavaScript文件

├── data/

│ └── community_data.json # 数据存储文件

├── README.md # 使用说明

├── requirements.txt # 依赖包列表

└── demo.py # 演示程序

1. requirements.txt

flask==2.3.3

flask-sqlalchemy==3.0.5

flask-login==0.6.3

werkzeug==2.3.7

jinja2==3.1.2

wtforms==3.0.1

email-validator==2.0.0

python-dateutil==2.8.2

sqlite3

2. config.py

"""

配置文件 - 存储应用程序的配置参数

基于创新创业理论:标准化配置提升系统稳定性

"""

import os

from datetime import timedelta

class Config:

# 基础配置

SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

# 数据库配置

DATABASE_URL = os.environ.get('DATABASE_URL') or 'sqlite:///community.db'

# 应用配置

APP_NAME = "智慧社区公告系统"

APP_DESCRIPTION = "高效、便捷的社区信息推送平台"

# 推送配置

NOTIFICATION_ENABLED = True

PUSH_DELAY_SECONDS = 2 # 推送延迟时间

# 分页配置

ANNOUNCEMENTS_PER_PAGE = 10

COMMENTS_PER_PAGE = 5

# 权限配置

MAX_TITLE_LENGTH = 100

MAX_CONTENT_LENGTH = 2000

MAX_COMMENT_LENGTH = 500

# 时间配置

ANNOUNCEMENT_EXPIRY_DAYS = 30 # 公告过期天数

SESSION_TIMEOUT = timedelta(hours=24)

# 用户角色

ROLES = {

'admin': '管理员',

'resident': '居民',

'committee': '业委会成员'

}

# 公告类型

ANNOUNCEMENT_TYPES = {

'notice': '通知公告',

'activity': '活动通知',

'emergency': '紧急通知',

'service': '物业服务',

'other': '其他'

}

# 优先级等级

PRIORITY_LEVELS = {

'low': '普通',

'medium': '重要',

'high': '紧急',

'urgent': '特急'

}

class DevelopmentConfig(Config):

DEBUG = True

TESTING = True

class ProductionConfig(Config):

DEBUG = False

TESTING = False

# 配置映射

config = {

'development': DevelopmentConfig,

'production': ProductionConfig,

'default': DevelopmentConfig

}

3. models.py

"""

数据模型模块 - 定义系统的核心数据结构和关系

基于创新创业理论:数据模型是产品的骨架

"""

from flask_sqlalchemy import SQLAlchemy

from flask_login import UserMixin

from werkzeug.security import generate_password_hash, check_password_hash

from datetime import datetime

import json

db = SQLAlchemy()

class User(UserMixin, db.Model):

"""用户模型 - 存储社区居民信息"""

__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)

username = db.Column(db.String(80), unique=True, nullable=False, comment='用户名')

email = db.Column(db.String(120), unique=True, nullable=False, comment='邮箱')

password_hash = db.Column(db.String(128), nullable=False, comment='密码哈希')

real_name = db.Column(db.String(50), nullable=False, comment='真实姓名')

phone = db.Column(db.String(20), comment='联系电话')

room_number = db.Column(db.String(20), comment='房间号')

role = db.Column(db.String(20), default='resident', comment='用户角色')

# 关联关系

announcements = db.relationship('Announcement', backref='author', lazy=True)

comments = db.relationship('Comment', backref='user', lazy=True)

likes = db.relationship('Like', backref='user', lazy=True)

# 时间戳

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

def set_password(self, password):

"""设置密码"""

self.password_hash = generate_password_hash(password)

def check_password(self, password):

"""验证密码"""

return check_password_hash(self.password_hash, password)

def get_avatar_url(self):

"""获取头像URL"""

return f"/static/avatars/{self.role}.png"

def to_dict(self, include_private=False):

"""转换为字典格式"""

data = {

'id': self.id,

'username': self.username,

'real_name': self.real_name,

'role': self.role,

'room_number': self.room_number,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_private:

data.update({

'email': self.email,

'phone': self.phone

})

return data

def __repr__(self):

return f'<User {self.username}>'

class Announcement(db.Model):

"""公告模型 - 存储社区公告信息"""

__tablename__ = 'announcements'

id = db.Column(db.Integer, primary_key=True)

title = db.Column(db.String(200), nullable=False, comment='标题')

content = db.Column(db.Text, nullable=False, comment='内容')

type = db.Column(db.String(20), default='notice', comment='类型')

priority = db.Column(db.String(20), default='medium', comment='优先级')

# 作者信息

author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 推送状态

is_published = db.Column(db.Boolean, default=False, comment='是否已发布')

is_pinned = db.Column(db.Boolean, default=False, comment='是否置顶')

push_status = db.Column(db.String(20), default='draft', comment='推送状态')

# 统计数据

view_count = db.Column(db.Integer, default=0, comment='浏览次数')

like_count = db.Column(db.Integer, default=0, comment='点赞数')

comment_count = db.Column(db.Integer, default=0, comment='评论数')

# 时间信息

publish_time = db.Column(db.DateTime, comment='发布时间')

expiry_date = db.Column(db.DateTime, comment='过期时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 关联关系

comments = db.relationship('Comment', backref='announcement', lazy=True, cascade='all, delete-orphan')

likes = db.relationship('Like', backref='announcement', lazy=True, cascade='all, delete-orphan')

def get_type_display(self):

"""获取类型显示名称"""

from config import Config

return Config.ANNOUNCEMENT_TYPES.get(self.type, '未知')

def get_priority_display(self):

"""获取优先级显示名称"""

from config import Config

return Config.PRIORITY_LEVELS.get(self.priority, '普通')

def get_author_name(self):

"""获取作者姓名"""

return self.author.real_name if self.author else '未知'

def is_expired(self):

"""判断是否过期"""

if not self.expiry_date:

return False

return datetime.utcnow() > self.expiry_date

def increment_view_count(self):

"""增加浏览次数"""

self.view_count += 1

db.session.commit()

def to_dict(self, include_content=True):

"""转换为字典格式"""

data = {

'id': self.id,

'title': self.title,

'type': self.type,

'type_display': self.get_type_display(),

'priority': self.priority,

'priority_display': self.get_priority_display(),

'author_name': self.get_author_name(),

'is_published': self.is_published,

'is_pinned': self.is_pinned,

'push_status': self.push_status,

'view_count': self.view_count,

'like_count': self.like_count,

'comment_count': self.comment_count,

'publish_time': self.publish_time.isoformat() if self.publish_time else None,

'expiry_date': self.expiry_date.isoformat() if self.expiry_date else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_content:

data['content'] = self.content

return data

def __repr__(self):

return f'<Announcement {self.title}>'

class Comment(db.Model):

"""评论模型 - 存储用户对公告的评论"""

__tablename__ = 'comments'

id = db.Column(db.Integer, primary_key=True)

content = db.Column(db.Text, nullable=False, comment='评论内容')

is_reply = db.Column(db.Boolean, default=False, comment='是否为回复')

parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), comment='父评论ID')

# 关联信息

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 自关联关系

replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]), lazy=True)

def get_user_name(self):

"""获取用户姓名"""

return self.user.real_name if self.user else '匿名用户'

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'content': self.content,

'is_reply': self.is_reply,

'parent_id': self.parent_id,

'user_name': self.get_user_name(),

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None,

'replies': [reply.to_dict() for reply in self.replies] if self.replies else []

}

def __repr__(self):

return f'<Comment {self.content[:20]}>'

class Like(db.Model):

"""点赞模型 - 存储用户对公告的点赞记录"""

__tablename__ = 'likes'

id = db.Column(db.Integer, primary_key=True)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 唯一约束

__table_args__ = (db.UniqueConstraint('user_id', 'announcement_id', name='unique_user_announcement_like'),)

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<Like user:{self.user_id} announcement:{self.announcement_id}>'

class PushRecord(db.Model):

"""推送记录模型 - 记录公告推送历史"""

__tablename__ = 'push_records'

id = db.Column(db.Integer, primary_key=True)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

recipient_count = db.Column(db.Integer, default=0, comment='接收人数')

success_count = db.Column(db.Integer, default=0, comment='成功推送数')

failure_count = db.Column(db.Integer, default=0, comment='推送失败数')

# 时间信息

push_time = db.Column(db.DateTime, default=datetime.utcnow, comment='推送时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 关联关系

announcement = db.relationship('Announcement', backref='push_records')

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'announcement_id': self.announcement_id,

'recipient_count': self.recipient_count,

'success_count': self.success_count,

'failure_count': self.failure_count,

'push_time': self.push_time.isoformat() if self.push_time else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<PushRecord announcement:{self.announcement_id}>'

4. database.py

"""

数据库操作模块 - 封装所有数据库相关的操作

基于创新创业理论:数据层抽象提升系统可维护性

"""

from sqlalchemy import create_engine, desc, asc, func, and_, or_

from sqlalchemy.orm import sessionmaker, scoped_session

from sqlalchemy.exc import SQLAlchemyError

import logging

from datetime import datetime, timedelta

import json

from models import db, User, Announcement, Comment, Like, PushRecord

from config import Config

class DatabaseManager:

"""数据库管理器 - 统一管理所有数据库操作"""

def __init__(self, app=None):

self.app = app

self.engine = None

self.Session = None

if app:

self.init_app(app)

def init_app(self, app):

"""初始化数据库连接"""

self.app = app

self.engine = create_engine(

app.config['SQLALCHEMY_DATABASE_URI'],

echo=app.config.get('SQLALCHEMY_ECHO', False)

)

self.Session = scoped_session(sessionmaker(bind=self.engine))

db.init_app(app)

def get_session(self):

"""获取数据库会话"""

return self.Session()

def close_session(self, session):

"""关闭数据库会话"""

if session:

session.close()

# 用户相关操作

def create_user(self, user_data):

"""创建新用户"""

session = self.get_session()

try:

# 检查用户名和邮箱是否已存在

existing_user = session.query(User).filter(

or_(

User.username == user_data['username'],

User.email == user_data['email']

)

).first()

if existing_user:

return None, "用户名或邮箱已存在"

user = User(

username=user_data['username'],

email=user_data['email'],

real_name=user_data['real_name'],

phone=user_data.get('phone', ''),

room_number=user_data.get('room_number', ''),

role=user_data.get('role', 'resident')

)

user.set_password(user_data['password'])

session.add(user)

session.commit()

return user, "用户创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建用户失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_user_by_username(self, username):

"""根据用户名获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.username == username).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_user_by_id(self, user_id):

"""根据用户ID获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.id == user_id).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_users_paginated(self, page=1, per_page=20, filters=None):

"""分页获取用户列表"""

session = self.get_session()

try:

query = session.query(User)

# 应用过滤条件

if filters:

if filters.get('role'):

query = query.filter(User.role == filters['role'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

User.username.like(keyword),

User.real_name.like(keyword),

User.room_number.like(keyword)

)

)

# 排序和分页

users = query.order_by(desc(User.created_at)).paginate(

page=page, per_page=per_page, error_out=False

)

return users

except SQLAlchemyError as e:

logging.error(f"获取用户列表失败: {e}")

return None

finally:

self.close_session(session)

# 公告相关操作

def create_announcement(self, announcement_data, author_id):

"""创建新公告"""

session = self.get_session()

try:

# 计算过期时间

expiry_days = announcement_data.get('expiry_days', 30)

expiry_date = datetime.utcnow() + timedelta(days=expiry_days)

announcement = Announcement(

title=announcement_data['title'],

content=announcement_data['content'],

type=announcement_data.get('type', 'notice'),

priority=announcement_data.get('priority', 'medium'),

author_id=author_id,

is_pinned=announcement_data.get('is_pinned', False),

expiry_date=expiry_date

)

session.add(announcement)

session.commit()

return announcement, "公告创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建公告失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_announcement_by_id(self, announcement_id):

"""根据ID获取公告"""

session = self.get_session()

try:

return session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

except SQLAlchemyError as e:

logging.error(f"查询公告失败: {e}")

return None

finally:

self.close_session(session)

def get_announcements_paginated(self, page=1, per_page=10, filters=None, user_id=None):

"""分页获取公告列表"""

session = self.get_session()

try:

query = session.query(Announcement).filter(

Announcement.is_published == True

)

# 排除过期的公告

query = query.filter(

or_(

Announcement.expiry_date.is_(None),

Announcement.expiry_date > datetime.utcnow()

)

)

# 应用过滤条件

if filters:

if filters.get('type'):

query = query.filter(Announcement.type == filters['type'])

if filters.get('priority'):

query = query.filter(Announcement.priority == filters['priority'])

if filters.get('author_id'):

query = query.filter(Announcement.author_id == filters['author_id'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

Announcement.title.like(keyword),

Announcement.content.like(keyword)

)

)

# 排序:置顶的在前,然后按发布时间倒序

query = query.order_by(

desc(Announcement.is_pinned),

desc(Announcement.publish_time)

)

announcements = query.paginate(

page=page, per_page=per_page, error_out=False

)

# 如果用户已登录,标记已读状态

if user_id:

for announcement in announcements.items:

# 这里可以添加已读状态的逻辑

pass

return announcements

except SQLAlchemyError as e:

logging.error(f"获取公告列表失败: {e}")

return None

finally:

self.close_session(session)

def publish_announcement(self, announcement_id):

"""发布公告"""

session = self.get_session()

try:

announcement = session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

if not announcement:

return None, "公告不存在"

announcement.is_published = True

announcement.publish_time = datetime.utcnow()

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/7 17:37:38

什么是网络安全?如何系统学习?这里有一份清晰的自学路径图

什么是网络安全&#xff1f;如何入职&#xff1f; 网络安全是通过技术、管理和法律手段&#xff0c;保护网络系统、数据及用户免受攻击、破坏或非法访问的能力。它如同数字时代的“无形盾牌”&#xff0c;其目标是确保信息的保密性、完整性和可用性&#xff0c;最终保障个人、…

作者头像 李华
网站建设 2026/5/2 10:53:12

从基础到实践:信息系统安全风险防范的十大常用技术剖析

伴随着互联网的发展&#xff0c;它已经成为我们生活中不可或缺的存在&#xff0c;无论是个人还是企业&#xff0c;都离不开互联网。正因为互联网得到了重视&#xff0c;网络安全问题也随之加剧&#xff0c;给我们的信息安全造成严重威胁&#xff0c;而想要有效规避这些风险&…

作者头像 李华
网站建设 2026/4/20 22:24:04

Uni-app App 端自定义导航栏完整实现指南

核心配置&#xff1a;在 pages.json 中设置 navigationStyle: "custom" 开启自定义导航栏 高度适配&#xff1a;通过 uni.getSystemInfoSync() 或 uni.getMenuButtonBoundingClientRect() 获取状态栏 / 导航栏高度&#xff0c;是适配的关键 组件封装&#xff1a;封…

作者头像 李华
网站建设 2026/5/6 0:29:02

Linux如何查看当前的网关配置?

在Linux操作系统中&#xff0c;可以通多种方法来查看网关配置&#xff0c;但最常用的就是通过命令行工具来查看。Linux如何查看当前的网关配置?以下是常用命令&#xff0c;我们来看看吧。1、使用route命令执行以下命令可以查看当前的路由表&#xff1a;route -n在输出结果中&a…

作者头像 李华
网站建设 2026/5/7 8:42:36

基于Spring Boot的煤矿信息管理系统

3 煤矿信息管理系统的设计 煤矿信息、生产入库、销售订单是煤矿信息管理系统的重要组成部分&#xff0c;信息清晰、详细、准确&#xff0c;能够有效地促进煤矿信息管理系统的运行[5]。基础设定函数是对整个系统的总体布局进行合理安排&#xff0c;包括&#xff1b;煤矿信息、生…

作者头像 李华
网站建设 2026/5/3 18:25:02

基于Spring Boot的美食分享系统设计与实现

2系统分析 2.1需求分析 需求分析做为手机软件整体规划环节和项目生命周期的关键一部分&#xff0c;应当是“实现什么东西”而不是“实现”[5]。根据开发者对调研分析关键点、作用、特性、稳定性的掌握&#xff0c;将用户的无形要求转换为有形的界定&#xff0c;以便确定系统的运…

作者头像 李华