news 2026/4/18 10:51:29

社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

我将为您创建一个完整的社区公告推送小程序。以下是完整的项目结构:

项目结构

community_announcement_system/

├── main.py # 主程序入口

├── models.py # 数据模型模块

├── announcement_manager.py # 公告管理核心模块

├── user_manager.py # 用户管理模块

├── notification_service.py # 推送通知服务模块

├── database.py # 数据库操作模块

├── utils.py # 工具函数模块

├── config.py # 配置文件

├── templates.py # HTML模板模块

├── static/

│ ├── style.css # 样式文件

│ └── script.js # JavaScript文件

├── data/

│ └── community_data.json # 数据存储文件

├── README.md # 使用说明

├── requirements.txt # 依赖包列表

└── demo.py # 演示程序

1. requirements.txt

flask==2.3.3

flask-sqlalchemy==3.0.5

flask-login==0.6.3

werkzeug==2.3.7

jinja2==3.1.2

wtforms==3.0.1

email-validator==2.0.0

python-dateutil==2.8.2

sqlite3

2. config.py

"""

配置文件 - 存储应用程序的配置参数

基于创新创业理论:标准化配置提升系统稳定性

"""

import os

from datetime import timedelta

class Config:

# 基础配置

SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

# 数据库配置

DATABASE_URL = os.environ.get('DATABASE_URL') or 'sqlite:///community.db'

# 应用配置

APP_NAME = "智慧社区公告系统"

APP_DESCRIPTION = "高效、便捷的社区信息推送平台"

# 推送配置

NOTIFICATION_ENABLED = True

PUSH_DELAY_SECONDS = 2 # 推送延迟时间

# 分页配置

ANNOUNCEMENTS_PER_PAGE = 10

COMMENTS_PER_PAGE = 5

# 权限配置

MAX_TITLE_LENGTH = 100

MAX_CONTENT_LENGTH = 2000

MAX_COMMENT_LENGTH = 500

# 时间配置

ANNOUNCEMENT_EXPIRY_DAYS = 30 # 公告过期天数

SESSION_TIMEOUT = timedelta(hours=24)

# 用户角色

ROLES = {

'admin': '管理员',

'resident': '居民',

'committee': '业委会成员'

}

# 公告类型

ANNOUNCEMENT_TYPES = {

'notice': '通知公告',

'activity': '活动通知',

'emergency': '紧急通知',

'service': '物业服务',

'other': '其他'

}

# 优先级等级

PRIORITY_LEVELS = {

'low': '普通',

'medium': '重要',

'high': '紧急',

'urgent': '特急'

}

class DevelopmentConfig(Config):

DEBUG = True

TESTING = True

class ProductionConfig(Config):

DEBUG = False

TESTING = False

# 配置映射

config = {

'development': DevelopmentConfig,

'production': ProductionConfig,

'default': DevelopmentConfig

}

3. models.py

"""

数据模型模块 - 定义系统的核心数据结构和关系

基于创新创业理论:数据模型是产品的骨架

"""

from flask_sqlalchemy import SQLAlchemy

from flask_login import UserMixin

from werkzeug.security import generate_password_hash, check_password_hash

from datetime import datetime

import json

db = SQLAlchemy()

class User(UserMixin, db.Model):

"""用户模型 - 存储社区居民信息"""

__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)

username = db.Column(db.String(80), unique=True, nullable=False, comment='用户名')

email = db.Column(db.String(120), unique=True, nullable=False, comment='邮箱')

password_hash = db.Column(db.String(128), nullable=False, comment='密码哈希')

real_name = db.Column(db.String(50), nullable=False, comment='真实姓名')

phone = db.Column(db.String(20), comment='联系电话')

room_number = db.Column(db.String(20), comment='房间号')

role = db.Column(db.String(20), default='resident', comment='用户角色')

# 关联关系

announcements = db.relationship('Announcement', backref='author', lazy=True)

comments = db.relationship('Comment', backref='user', lazy=True)

likes = db.relationship('Like', backref='user', lazy=True)

# 时间戳

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

def set_password(self, password):

"""设置密码"""

self.password_hash = generate_password_hash(password)

def check_password(self, password):

"""验证密码"""

return check_password_hash(self.password_hash, password)

def get_avatar_url(self):

"""获取头像URL"""

return f"/static/avatars/{self.role}.png"

def to_dict(self, include_private=False):

"""转换为字典格式"""

data = {

'id': self.id,

'username': self.username,

'real_name': self.real_name,

'role': self.role,

'room_number': self.room_number,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_private:

data.update({

'email': self.email,

'phone': self.phone

})

return data

def __repr__(self):

return f'<User {self.username}>'

class Announcement(db.Model):

"""公告模型 - 存储社区公告信息"""

__tablename__ = 'announcements'

id = db.Column(db.Integer, primary_key=True)

title = db.Column(db.String(200), nullable=False, comment='标题')

content = db.Column(db.Text, nullable=False, comment='内容')

type = db.Column(db.String(20), default='notice', comment='类型')

priority = db.Column(db.String(20), default='medium', comment='优先级')

# 作者信息

author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 推送状态

is_published = db.Column(db.Boolean, default=False, comment='是否已发布')

is_pinned = db.Column(db.Boolean, default=False, comment='是否置顶')

push_status = db.Column(db.String(20), default='draft', comment='推送状态')

# 统计数据

view_count = db.Column(db.Integer, default=0, comment='浏览次数')

like_count = db.Column(db.Integer, default=0, comment='点赞数')

comment_count = db.Column(db.Integer, default=0, comment='评论数')

# 时间信息

publish_time = db.Column(db.DateTime, comment='发布时间')

expiry_date = db.Column(db.DateTime, comment='过期时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 关联关系

comments = db.relationship('Comment', backref='announcement', lazy=True, cascade='all, delete-orphan')

likes = db.relationship('Like', backref='announcement', lazy=True, cascade='all, delete-orphan')

def get_type_display(self):

"""获取类型显示名称"""

from config import Config

return Config.ANNOUNCEMENT_TYPES.get(self.type, '未知')

def get_priority_display(self):

"""获取优先级显示名称"""

from config import Config

return Config.PRIORITY_LEVELS.get(self.priority, '普通')

def get_author_name(self):

"""获取作者姓名"""

return self.author.real_name if self.author else '未知'

def is_expired(self):

"""判断是否过期"""

if not self.expiry_date:

return False

return datetime.utcnow() > self.expiry_date

def increment_view_count(self):

"""增加浏览次数"""

self.view_count += 1

db.session.commit()

def to_dict(self, include_content=True):

"""转换为字典格式"""

data = {

'id': self.id,

'title': self.title,

'type': self.type,

'type_display': self.get_type_display(),

'priority': self.priority,

'priority_display': self.get_priority_display(),

'author_name': self.get_author_name(),

'is_published': self.is_published,

'is_pinned': self.is_pinned,

'push_status': self.push_status,

'view_count': self.view_count,

'like_count': self.like_count,

'comment_count': self.comment_count,

'publish_time': self.publish_time.isoformat() if self.publish_time else None,

'expiry_date': self.expiry_date.isoformat() if self.expiry_date else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_content:

data['content'] = self.content

return data

def __repr__(self):

return f'<Announcement {self.title}>'

class Comment(db.Model):

"""评论模型 - 存储用户对公告的评论"""

__tablename__ = 'comments'

id = db.Column(db.Integer, primary_key=True)

content = db.Column(db.Text, nullable=False, comment='评论内容')

is_reply = db.Column(db.Boolean, default=False, comment='是否为回复')

parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), comment='父评论ID')

# 关联信息

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 自关联关系

replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]), lazy=True)

def get_user_name(self):

"""获取用户姓名"""

return self.user.real_name if self.user else '匿名用户'

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'content': self.content,

'is_reply': self.is_reply,

'parent_id': self.parent_id,

'user_name': self.get_user_name(),

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None,

'replies': [reply.to_dict() for reply in self.replies] if self.replies else []

}

def __repr__(self):

return f'<Comment {self.content[:20]}>'

class Like(db.Model):

"""点赞模型 - 存储用户对公告的点赞记录"""

__tablename__ = 'likes'

id = db.Column(db.Integer, primary_key=True)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 唯一约束

__table_args__ = (db.UniqueConstraint('user_id', 'announcement_id', name='unique_user_announcement_like'),)

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<Like user:{self.user_id} announcement:{self.announcement_id}>'

class PushRecord(db.Model):

"""推送记录模型 - 记录公告推送历史"""

__tablename__ = 'push_records'

id = db.Column(db.Integer, primary_key=True)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

recipient_count = db.Column(db.Integer, default=0, comment='接收人数')

success_count = db.Column(db.Integer, default=0, comment='成功推送数')

failure_count = db.Column(db.Integer, default=0, comment='推送失败数')

# 时间信息

push_time = db.Column(db.DateTime, default=datetime.utcnow, comment='推送时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 关联关系

announcement = db.relationship('Announcement', backref='push_records')

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'announcement_id': self.announcement_id,

'recipient_count': self.recipient_count,

'success_count': self.success_count,

'failure_count': self.failure_count,

'push_time': self.push_time.isoformat() if self.push_time else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<PushRecord announcement:{self.announcement_id}>'

4. database.py

"""

数据库操作模块 - 封装所有数据库相关的操作

基于创新创业理论:数据层抽象提升系统可维护性

"""

from sqlalchemy import create_engine, desc, asc, func, and_, or_

from sqlalchemy.orm import sessionmaker, scoped_session

from sqlalchemy.exc import SQLAlchemyError

import logging

from datetime import datetime, timedelta

import json

from models import db, User, Announcement, Comment, Like, PushRecord

from config import Config

class DatabaseManager:

"""数据库管理器 - 统一管理所有数据库操作"""

def __init__(self, app=None):

self.app = app

self.engine = None

self.Session = None

if app:

self.init_app(app)

def init_app(self, app):

"""初始化数据库连接"""

self.app = app

self.engine = create_engine(

app.config['SQLALCHEMY_DATABASE_URI'],

echo=app.config.get('SQLALCHEMY_ECHO', False)

)

self.Session = scoped_session(sessionmaker(bind=self.engine))

db.init_app(app)

def get_session(self):

"""获取数据库会话"""

return self.Session()

def close_session(self, session):

"""关闭数据库会话"""

if session:

session.close()

# 用户相关操作

def create_user(self, user_data):

"""创建新用户"""

session = self.get_session()

try:

# 检查用户名和邮箱是否已存在

existing_user = session.query(User).filter(

or_(

User.username == user_data['username'],

User.email == user_data['email']

)

).first()

if existing_user:

return None, "用户名或邮箱已存在"

user = User(

username=user_data['username'],

email=user_data['email'],

real_name=user_data['real_name'],

phone=user_data.get('phone', ''),

room_number=user_data.get('room_number', ''),

role=user_data.get('role', 'resident')

)

user.set_password(user_data['password'])

session.add(user)

session.commit()

return user, "用户创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建用户失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_user_by_username(self, username):

"""根据用户名获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.username == username).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_user_by_id(self, user_id):

"""根据用户ID获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.id == user_id).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_users_paginated(self, page=1, per_page=20, filters=None):

"""分页获取用户列表"""

session = self.get_session()

try:

query = session.query(User)

# 应用过滤条件

if filters:

if filters.get('role'):

query = query.filter(User.role == filters['role'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

User.username.like(keyword),

User.real_name.like(keyword),

User.room_number.like(keyword)

)

)

# 排序和分页

users = query.order_by(desc(User.created_at)).paginate(

page=page, per_page=per_page, error_out=False

)

return users

except SQLAlchemyError as e:

logging.error(f"获取用户列表失败: {e}")

return None

finally:

self.close_session(session)

# 公告相关操作

def create_announcement(self, announcement_data, author_id):

"""创建新公告"""

session = self.get_session()

try:

# 计算过期时间

expiry_days = announcement_data.get('expiry_days', 30)

expiry_date = datetime.utcnow() + timedelta(days=expiry_days)

announcement = Announcement(

title=announcement_data['title'],

content=announcement_data['content'],

type=announcement_data.get('type', 'notice'),

priority=announcement_data.get('priority', 'medium'),

author_id=author_id,

is_pinned=announcement_data.get('is_pinned', False),

expiry_date=expiry_date

)

session.add(announcement)

session.commit()

return announcement, "公告创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建公告失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_announcement_by_id(self, announcement_id):

"""根据ID获取公告"""

session = self.get_session()

try:

return session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

except SQLAlchemyError as e:

logging.error(f"查询公告失败: {e}")

return None

finally:

self.close_session(session)

def get_announcements_paginated(self, page=1, per_page=10, filters=None, user_id=None):

"""分页获取公告列表"""

session = self.get_session()

try:

query = session.query(Announcement).filter(

Announcement.is_published == True

)

# 排除过期的公告

query = query.filter(

or_(

Announcement.expiry_date.is_(None),

Announcement.expiry_date > datetime.utcnow()

)

)

# 应用过滤条件

if filters:

if filters.get('type'):

query = query.filter(Announcement.type == filters['type'])

if filters.get('priority'):

query = query.filter(Announcement.priority == filters['priority'])

if filters.get('author_id'):

query = query.filter(Announcement.author_id == filters['author_id'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

Announcement.title.like(keyword),

Announcement.content.like(keyword)

)

)

# 排序:置顶的在前,然后按发布时间倒序

query = query.order_by(

desc(Announcement.is_pinned),

desc(Announcement.publish_time)

)

announcements = query.paginate(

page=page, per_page=per_page, error_out=False

)

# 如果用户已登录,标记已读状态

if user_id:

for announcement in announcements.items:

# 这里可以添加已读状态的逻辑

pass

return announcements

except SQLAlchemyError as e:

logging.error(f"获取公告列表失败: {e}")

return None

finally:

self.close_session(session)

def publish_announcement(self, announcement_id):

"""发布公告"""

session = self.get_session()

try:

announcement = session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

if not announcement:

return None, "公告不存在"

announcement.is_published = True

announcement.publish_time = datetime.utcnow()

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 4:00:09

动态特征选择稳住房颤预警

&#x1f4dd; 博客主页&#xff1a;jaxzheng的CSDN主页 医疗数据科学&#xff1a;从海量数据到精准医疗的革命目录医疗数据科学&#xff1a;从海量数据到精准医疗的革命 引言 数据来源&#xff1a;医疗数据的海洋与价值挖掘 数据清洗与预处理的实践 分析技术&#xff1a;从统…

作者头像 李华
网站建设 2026/4/18 4:00:01

前端性能优化工程化落地指南:从基础实践到极致性能突破

前端性能优化工程化落地指南&#xff1a;从基础实践到极致性能突破 1. 前端性能优化的核心可做功项 前端性能优化是一个系统工程&#xff0c;覆盖从代码编写到资源加载、从解析渲染到运行时交互的全链路。以下是现代前端项目常见的性能优化方向&#xff1a; 1.1 代码与构建优…

作者头像 李华
网站建设 2026/4/18 4:00:08

工业互联网在电池拆解中的智能化升级路径

一、工业互联网与电池拆解的智能化融合&#xff1a;技术背景与必要性随着新能源汽车市场的蓬勃发展&#xff0c;动力电池的退役潮正逐步席卷全球。据行业统计数据&#xff0c;中国退役动力电池数量预计将在2025年突破80万吨&#xff0c;并持续增长。这一趋势使得动力电池回收利…

作者头像 李华
网站建设 2026/4/18 4:00:02

先睹为快 | 2026年3月国际学术会议一览表

2026年3月会议征稿主题广泛覆盖人工智能、计算智能、大模型与生成式AI、机器学习、数据挖掘、计算机技术与工程、算法、数据安全、通信技术等信息技术核心领域&#xff1b;同时深入拓展至低空经济、智慧交通、智能电网、电气控制、自动化工程、控制系统、机械工程等先进工程应用…

作者头像 李华
网站建设 2026/4/17 20:34:46

基于Spring Boot的怀来葡萄酒宣传网站的设计与实现

3 可行性研究与需求分析 3.1可行性分析 葡萄酒在网上宣传还是比较广泛存在&#xff0c;对怀来葡萄酒宣传网站的可行性分析基于当下的互联网背景&#xff0c;从经济、市场、技术、法律和用户使用上进行了调查&#xff0c;从此验证次系统开发的可行性。下面分别从以下几点进行分析…

作者头像 李华