私信
文章
73
评论
12
点赞
80
原创 52
翻译 4
转载 17

文章
关注
粉丝
收藏

个人分类:

   2019-03-30 00:53:42    2019-03-30 00:53:42   

python asyncio 协程 coroutine
### 介绍 终端使用的是asyncio,单线程支持并发操作。 聊天终端,暂时仅提供登陆/注销/发送在线消息/发送离线消息/帮助功能(未完待续) 登陆命令:auth userid password (userid为用户账号,用户数据存储在mysql中) 注销命令:logout 发送消息:msg userid message (支持在线和离线消息,离线消息,当对方不在线时,存储到 redis中,等用户上线之后再推送给该用户) 帮助命令:help/? ### 环境 系统:`CentOS7` 编程语言:`Python` 语言解释器:`Cpython-3.7.2` 数据库:`mysql-5.7` `redis-5.0.3` 缓存:`redis-5.0.3` ### 表结构 #### user表 ```sql CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(255) NOT NULL, `password` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=91 DEFAULT CHARSET=utf8; ``` ### 服务器端代码 chat_server.py ```py #coding:utf-8 ###################### # 作者: SheYinsong # # 时间: 2019-03-30 # ###################### import asyncio import aiomysql import aioredis import sys import struct import ssl import time import logging SERVER_ADDRESS = ('0.0.0.0',10000) logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) log=logging.getLogger('main') #LOOP loop=asyncio.get_event_loop() #TLS配置 CERTFILE='ssl/server.crt' KEYFILE='ssl/server.key' #在线用户userid到读写Stream映射 auth_userid_map_wstream={} auth_wstream_map_userid={} #mysql 配置 db_config={ 'host':'localhost', 'port':3306, 'user':'root', 'password':'xxxxxxxx', 'db':'chat', 'charset':'utf8' } #SQL 模板 QUERY_AUTH_SQL='select password from user where id=%s' QUERY_USERID_IS_EXISTS_SQL='select id from user where id=%s' QUERY_USERNAME_OF_USERID_SQL='select username from user where id=%s' #redis配置 #redis_host='localhost' #redis_port=6379 redis_unix_socket='/var/run/redis/redis.sock' #redis KEY模板 KV_EXISTS_USERID=u'kv_exists_userid:userid:%s' KV_USERID_GET_USERNAME=u'kv_userid_get_username:userid:%s' LIST_USERMESSAGES_OF_USERID=u'list_usermessages_of_userid:userid:%s' #支持的命令列表 cmd_list=['auth','msg','logout'] #命令帮助 cmd_help=""" =============================================== | | | 命令使用说明: | | | =============================================== |命令 参数 参数 (说明) | =============================================== |auth UID PASSWORD (登录系统) | |msg UID MESSAGE (给用户发送消息) | =============================================== |命令 (说明) | =============================================== |logout (退出) | =============================================== """ async def get_custom_time_string(t_format="%Y-%m-%d %X"): """获取指定格式的时间字符串 默认格式为"%Y-%m-%d %X" """ return time.strftime(t_format) async def get_db_pool(): """获取mysql连接池""" pool = await aiomysql.create_pool(**db_config) return pool async def get_redis_pool(): """获取redis连接池""" pool = await aioredis.create_pool( redis_unix_socket, minsize=5, maxsize=10) return pool async def db_query(sql): """mysql执行查询操作""" pool = await get_db_pool() results=() async with pool.acquire() as conn: async with conn.cursor() as cur: log.debug(f'查询数据库,SQL->[ {sql} ]') await cur.execute(sql) results = await cur.fetchall() pool.close() await pool.wait_closed() return results async def redis_query(cmd,*args): """redis执行查询操作""" pool=await get_redis_pool() with await pool as conn: results =await conn.execute(cmd,*args) pool.close() await pool.wait_closed() return results async def redis_dml(cmd,*args): """redis执行增删改操作""" pool=await get_redis_pool() with await pool as conn: await conn.execute(cmd,*args) pool.close() await pool.wait_closed() async def get_username_of_userid(userid): """获取userid对应的用户名""" if await redis_query('exists',KV_USERID_GET_USERNAME % userid): username=await redis_query('get',KV_USERID_GET_USERNAME % userid) return username.decode('utf-8') else: res=await db_query(QUERY_USERNAME_OF_USERID_SQL % userid) if res: username=res[0][0] await redis_dml('set',KV_USERID_GET_USERNAME % userid,username) return username return '' async def get_userid_of_wstream(wstream): """通过writestream获取userid""" try: userid=auth_wstream_map_userid[wstream] return userid except KeyError: return -1 async def get_wstream_of_userid(userid): """通过userid获取writestream""" try: write_stream=auth_userid_map_wstream[userid] return write_stream except KeyError: return '' async def write_data(writer,data): """发送数据""" b_data=bytes(data,encoding='utf-8') b_datasize=struct.pack('H',len(b_data)) try: writer.write(b_datasize+b_data) await writer.drain() return True except: await clean_wstream(writer) return False async def read_data(reader): """读取数据""" try: length=struct.unpack('H',await reader.read(2))[0] b_data=await reader.read(int(length)) return b_data.decode('utf-8') except Exception as e: log.debug(f'读取客户端数据异常:[{e}]') return '' async def clean_wstream(writer): """清理客户端登陆信息和关闭stream""" userid=await get_userid_of_wstream(writer) if userid != -1: try: del auth_userid_map_wstream[userid] except: pass try: del auth_wstream_map_userid[writer] except: pass writer.close() async def user_auth(*args): """用户认证""" userid=int(args[0]) pwd=args[1] sql=QUERY_AUTH_SQL % (userid) res=await db_query(sql) if res: if res[0][0] == pwd: return True return False async def userid_is_exists(userid): """"判断userid是否存在""" if await redis_query('exists',KV_EXISTS_USERID % userid): return True else: res=await db_query(QUERY_USERID_IS_EXISTS_SQL % userid) if res: await redis_dml('set',KV_EXISTS_USERID % userid,'') return True return False async def userid_is_online(userid): """判断用户是否在线""" try: writer=auth_userid_map_wstream[userid] return True except KeyError: return False async def offline_userid(operator_type,*args): """下线用户""" userid=int(args[0]) try: writer=auth_userid_map_wstream[userid] if operator_type == 0: msg='有用户登录您的账号,您已被挤下线!' elif operator_type == 1: msg='您已退出登录!' else: msg='您已退出系统!' await write_data(writer,msg) except Exception as e: log.debug(e) finally: try: del auth_userid_map_wstream[userid] except: log.debug('auth_userid_map_wstream 删除失败') try: del auth_wstream_map_userid[writer] except: log.debug('auth_wstream_map_userid 删除失败') async def send_user_msg(writer,*args): """发送用户消息""" to_userid=int(args[0]) from_userid=auth_wstream_map_userid[writer] content=' '.join(args[1:]) if not await userid_is_exists(to_userid): msg='系统不存在该userid!' await write_data(writer,msg) return from_username=await get_username_of_userid(from_userid) to_username=await get_username_of_userid(to_userid) custom_time=await get_custom_time_string() send_content=f'{custom_time}\n[ {from_username}|UID:{from_userid} ]:{content}' if to_userid == from_userid: send_content=f'{custom_time}\n[ {from_username}|UID:{from_userid} ]:{content}' await write_data(writer,send_content) return if await userid_is_online(to_userid): to_writer=auth_userid_map_wstream[to_userid] if await write_data(to_writer,send_content): await write_data(writer,send_content) return #用户不在线,发送离线消息 print("用户不在线,发送离线消息") await redis_dml('rpush',LIST_USERMESSAGES_OF_USERID % to_userid,send_content) await write_data(writer,f'[离线消息]\n{send_content}') async def user_is_auth(writer): """判断用户是否认证""" if writer in auth_wstream_map_userid.keys(): return True return False async def get_offline_msg_of_userid(userid): "获取userid对应的离线消息" return await redis_query('lrange',LIST_USERMESSAGES_OF_USERID % userid,0,-1) async def push_offline_msg(writer,*args): """推送离线消息""" userid=int(args[0]) user_offline_messages_list=await get_offline_msg_of_userid(userid) for user_offline_message in user_offline_messages_list: await write_data(writer,user_offline_message.decode('utf-8')) await redis_query('del',LIST_USERMESSAGES_OF_USERID % userid) async def update_auth(writer,*args): """更新auth_userid_map_wstream和auth_wstream_map_userid""" userid=int(args[0]) auth_userid_map_wstream[userid]=writer auth_wstream_map_userid[writer]=userid async def check_args_validity(writer,cmd,*args): if cmd == 'auth': if len(args) != 2: msg='提示:认证格式: [ auth userid pwd ]' await write_data(writer,msg) return False try: userid=int(args[0]) except: msg='userid为整数!' await write_data(writer,msg) return False elif cmd == 'msg': if len(args) < 2: msg='提示:消息格式: [ msg userid content ]' await write_data(writer,msg) return False try: to_userid=int(args[0]) except: msg='userid为整数!' await write_data(writer,msg) return False return True async def handler_client(reader,writer): """处理客户端连接""" address=writer.get_extra_info('peername') log=logging.getLogger('echo_{}_{}'.format(*address)) log.debug('收到新连接') args_ok=True cmd_ok=True while True: if not await user_is_auth(writer) and cmd_ok and args_ok: msg=u'请登陆!' await write_data(writer,msg) data=await read_data(reader) if not data: break cmd=data.split(' ')[0] args=data.split(' ')[1:] if cmd == '?' or cmd == 'help': await write_data(writer,f'{cmd_help}') cmd_ok=False continue elif cmd not in cmd_list: await write_data(writer,'输入的命令不支持,请使用 help 或 ? 查看帮助!') cmd_ok=False continue cmd_ok=True if not await user_is_auth(writer): if cmd != 'auth': continue if not await check_args_validity(writer,cmd,*args): args_ok=False continue args_ok=True if await user_auth(*args): await offline_userid(0,*args) await update_auth(writer,*args) msg='认证成功!' await write_data(writer,msg) await push_offline_msg(writer,*args) else: msg='认证失败!' await write_data(writer,msg) else: if not await check_args_validity(writer,cmd,*args): args_ok=False continue args_ok=True if cmd == 'msg': await send_user_msg(writer,*args) elif cmd == 'logout': userid=await get_userid_of_wstream(writer) await offline_userid(1,userid) elif cmd == 'auth': await write_data(writer,'您已登录!') context=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(certfile=CERTFILE,keyfile=KEYFILE) factory=asyncio.start_server(handler_client,*SERVER_ADDRESS,ssl=context) server=loop.run_until_complete(factory) log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS)) try: loop.run_forever() except KeyboardInterrupt: pass finally: log.debug('关闭服务器.') server.close() loop.run_until_complete(server.wait_closed()) log.debug('关闭事件循环LOOP.') loop.close() ``` ### 客户端 chat_client.py ```py #coding:utf-8 ###################### # 作者: SheYinsong # # 时间: 2019-03-30 # ###################### import asyncio from aioconsole import ainput import concurrent.futures import sys import ssl import time import struct import logging #日志配置 logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) log = logging.getLogger('main') #TLS配置 CERTFILE='ssl/server.crt' #chat服务器配置 SERVER_ADDRESS = ('chat.unotes.co',10000) #证书使用的chat.unotes.co域名,根据实际证书域名填写 loop = asyncio.get_event_loop() async def write_data(writer,data): """发送数据""" b_data=bytes(data,encoding='utf-8') b_datasize=struct.pack('H',len(b_data)) try: writer.write(b_datasize+b_data) await writer.drain() return True except: return False async def read_data(reader): """读取数据""" try: length=struct.unpack('H',await reader.read(2))[0] b_data=await reader.read(int(length)) return b_data.decode('utf-8') except Exception as e: log.debug(f'读取客户端数据异常:[{e}]') return '' async def read(reader,once=False): """循环读取数据""" while True: data=await read_data(reader) if data: if once: print(f'{data}\n',end='') break print(f'\x08\x08\x08{data}\n>>>',end='') async def chat_client(address): """聊天客户端""" log = logging.getLogger('chat_client') log.debug('connecting to {} port {}'.format(*address)) context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH) context.load_verify_locations(CERTFILE) reader, writer = await asyncio.open_connection(*address,ssl=context) await read(reader,True) read_task=asyncio.create_task(read(reader)) while True: data=await ainput('>>>') if not data: data='?' await write_data(writer,data) try: loop.run_until_complete( chat_client(SERVER_ADDRESS) ) except: pass finally: log.debug('closing event loop') loop.close() ``` ### 运行服务器 ```bash $ python chat_server_asyncio_tls.py ``` ``` asyncio: Using selector: EpollSelector main: starting up on 0.0.0.0 port 10000 ``` ### 运行客户端1 #### 用户张三 ```bash python chat_client_asyncio_tls.py ``` ``` 请登陆! >>>auth 10001 123456 认证成功! >>>msg 10002 你好,我是张三 [离线消息] 2019-03-29 17:00:24 [ 张三|UID:10001 ]:你好,我是张三 >>> ``` ### 运行客户端2 #### 用户李四 ```bash python chat_client_asyncio_tls.py ``` ``` 请登陆! >>>auth 10002 123456 认证成功! 2019-03-29 17:00:24 [ 张三|UID:10001 ]:你好,我是张三 >>>msg 10001 你好,张三,我收到您的消息 2019-03-29 17:02:52 [ 李四|UID:10002 ]:你好,张三,我收到您的消息 >>> ``` #### 查看客户端1 ### 用户张三 ```bash $ python chat_client_asyncio_tls.py ``` ``` 请登陆! >>>auth 10001 123456 认证成功! >>>msg 10002 你好,我是张三 [离线消息] 2019-03-29 17:00:24 [ 张三|UID:10001 ]:你好,我是张三 2019-03-29 17:02:52 [ 李四|UID:10002 ]:你好,张三,我收到您的消息 ```
阅读 170 评论 0 收藏 0
阅读 170
评论 0
收藏 0


   2019-01-21 15:19:07    2019-01-21 15:19:07   

python
#### 分析 微信认证名中一般都会附带地区名,通过数据库的省市县名和微信名做存在判断(```注意:此方法存在一定概率的误判```) #### 准备 需要省、市、县数据表的数据,需要的话请联系博主 #### 步骤 1.判断对应的省名是否在微信认证名中,如果在,则匹配出数据,并且判断如果是直辖市,获取对应的市级名字,id 2.如果省没匹配成功,则匹配市级数据的名字,如果成功,则获取对应的省名及对应省id 3.如果省市都没匹配成功,则匹配县区级数据的名字,如果成功,则获取对应的省市名及对应省市id ##### 数据表结构 ```sql CREATE TABLE `province` ( `id` bigint(19) NOT NULL AUTO_INCREMENT, `name` varchar(32) NOT NULL COMMENT '省份名称', `short_name` varchar(32) DEFAULT NULL COMMENT '省份简称', `remark` varchar(255) DEFAULT NULL COMMENT '备注', `created_at` datetime NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=36 DEFAULT CHARSET=utf8; CREATE TABLE `city` ( `id` bigint(19) NOT NULL AUTO_INCREMENT COMMENT '标识', `name` varchar(32) NOT NULL COMMENT '城市名称', `short_name` varchar(32) DEFAULT NULL COMMENT '城市简称', `province_id` bigint(19) NOT NULL COMMENT '所属省份标识', `level` int(10) NOT NULL COMMENT '城市等级(0未知,1:一线,2:二线,3:三线,4:四线)', `remark` varchar(255) DEFAULT NULL COMMENT '备注', `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), UNIQUE KEY `name` (`name`,`province_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=7471 DEFAULT CHARSET=utf8; CREATE TABLE `district` ( `id` bigint(19) NOT NULL AUTO_INCREMENT COMMENT '标识', `name` varchar(32) NOT NULL COMMENT '区县名称', `short_name` varchar(32) DEFAULT NULL COMMENT '区县简称', `city_id` bigint(19) NOT NULL COMMENT '所属城市标识', `remark` varchar(255) DEFAULT NULL COMMENT '备注', `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), UNIQUE KEY `name` (`name`,`city_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=10069 DEFAULT CHARSET=utf8; ``` ##### python脚本 ```python vim get_loc.py ``` ```python #coding:utf-8 import pymysql db_host="xx.xx.xx.xx" db_user="root" db_pass="xxxxx" db_port=3306 db_name="xxxx" #微信认证名 file=open("weixin_auth.txt",'r',encoding="utf-8") def execute_query_sql(sql): #循环读取数据库状态是0的关键字100个 db= pymysql.connect(host=db_host,port=db_port,user=db_user, passwd=db_pass, db=db_name) # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() #执行sql cursor.execute(sql) results=cursor.fetchall() # 关闭数据库连接 db.close() return results n=1 for f in file.readlines(): #获取省 pro_sql='select id,name from province;' result=execute_query_sql(pro_sql) is_has_pro=False pro_id=-1 city_id=-1 district_id=-1 pro_name="" for result_entry in result: if result_entry[1][0:2] in f: is_has_pro=True pro_name=result_entry[1] pro_id=result_entry[0] #这里写死id if pro_name=="北京市": city_id=7151 if pro_name=="上海市": city_id=7122 if pro_name=="天津市": city_id=7182 if pro_name=="重庆市": city_id=7430 break #获取市 is_has_city=False city_name="" if not is_has_pro: city_sql='select id,name from city where province_id>0;' result=execute_query_sql(city_sql) for result_entry in result: if result_entry[1].replace("市","").replace("县","") in f: is_has_city=True city_name=result_entry[1] city_id=result_entry[0] #获取省id pro_id_sql='select province_id from city where id='+str(city_id)+';' result=execute_query_sql(pro_id_sql) pro_id=result[0][0] break #获取区县 is_has_district=False district_name="" if not is_has_city and not is_has_pro: district_sql='select id,name from district where city_id>0;' result=execute_query_sql(district_sql) for result_entry in result: if result_entry[1].replace("市","").replace("县","") in f: is_has_district=True district_name=result_entry[1] district_id=result_entry[0] #获取市id city_id_sql='select city_id from district where id='+str(district_id)+';' result=execute_query_sql(city_id_sql) city_id=result[0][0] #获取省id pro_id_sql='select province_id from city where id='+str(city_id)+';' result=execute_query_sql(pro_id_sql) pro_id=result[0][0] break #打印对应的id #获取名字 if pro_id>0: sql='select name from province where id='+str(pro_id)+';' result=execute_query_sql(sql) pro_name=result[0][0] if city_id>0: sql='select name from city where id='+str(city_id)+';' result=execute_query_sql(sql) city_name=result[0][0] if district_id >0: sql='select name from district where id='+str(district_id)+';' result=execute_query_sql(sql) district_name=result[0][0] n+=1 print("当前执行到第"+str(n)+"行") print(f.strip()+","+str(pro_id)+","+str(city_id)+","+str(district_id)+","+pro_name+","+city_name+","+district_name+"\n") #把数据保存在文件中 file2=open("weixin_auth_loc.txt",'a',encoding="utf-8") file2.write(f.strip()+","+str(pro_id)+","+str(city_id)+","+str(district_id)+","+pro_name+","+city_name+","+district_name+"\n") file2.close() ```
阅读 73 评论 0 收藏 0
阅读 73
评论 0
收藏 0