本文共 13552 字,大约阅读时间需要 45 分钟。
create table user (id int auto_increment primary key,name varchar(32) not null,passwd varchar(16) default '000000');
create table hist (id int auto_increment primary key,name varchar(32) not null,word varchar(64) not null,time varchar(64));
create table words (id int auto_increment primary key,word varchar(64),interpret text);
import pymysql import re def main(): # 获取词典文件流对象 f = open('dict.txt') # 链接数据库并创建游标对象 db = pymysql.connect\ ('localhost','root','123456','dict') cursor = db.cursor() # 循环读取每一行内容 for line in f: try: L = re.split("[ ]+",line) except Exception: pass sql = "insert into words (word,interpret)\ values ('%s','%s')"%(L[0],' '.join(L[1:])) # 处理破坏SQL的特殊字符 try: cursor.execute(sql) db.commit() except Exception: db.rollback() f.close()if __name__ == "__main__": main()
#!/usr/bin/env python3#coding=utf-8'''name : Parisdate : 2018-8-27email : 1546079656@qq.commodules: python3.5 mysql pymysqlThis is a dict project for AID'''from socket import *import os import pymysqlimport timeimport sys import signal DICT_TEXT = "./dict.txt"HOST = '0.0.0.0'PORT = 8000ADDR = (HOST,PORT)#主控制流程def main(): #连接数据库 db = pymysql.connect\ ('localhost','root','123456','dict') #创建流式套接字 s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(ADDR) s.listen(5) #或略子进程退出 signal.signal(signal.SIGCHLD,signal.SIG_IGN) while True: try: c,addr = s.accept() print("Connect from",addr) except KeyboardInterrupt: s.close() sys.exit("服务器退出") except Exception as e: print(e) continue #创建子进程处理客户端请求 pid = os.fork() if pid == 0: s.close() do_child(c,db) else: c.close()def do_child(c,db): #循环接收请求 while True: data = c.recv(128).decode() print("Request:",data) if (not data) or data[0] == 'E': c.close() sys.exit(0) elif data[0] == 'R': do_register(c,db,data) elif data[0] == "L": do_login(c,db,data) elif data[0] == 'Q': do_query(c,db,data) elif data[0] == 'H': do_history(c,db,data)def do_register(c,db,data): passdef do_login(c,db,data): passdef do_query(c,db,data): passdef do_history(c,db,data): pass if __name__ == "__main__": main()客户端:
#!/usr/bin/env python3#coding=utf-8from socket import *import sys import getpassdef main(): if len(sys.argv) < 3: print("argv is error") return HOST = sys.argv[1] PORT = int(sys.argv[2]) ADDR = (HOST,PORT) s = socket() s.connect(ADDR) while True: print('''\n ===========Welcome========= --1.注册 2.登录 3.退出-- =========================== ''') try: cmd = int(input("输入选项>>")) except Exception: print("输入命令错误") continue if cmd not in [1,2,3]: print("对不起,没有该命令") sys.stdin.flush() #清除输入 continue elif cmd == 1: name = do_register(s) if name != 1: print("注册成功,直接登录!") login(s,name) else: print("注册失败!") elif cmd == 2: name = do_login(s) if name != 1: print("登录成功!") login(s,name) else: print("登录失败!") elif cmd == 3: s.send(b"E") sys.exit("谢谢使用")def do_register(s): passdef do_login(s): passdef login(s,name): passdef do_query(s,name): passdef do_history(s,name): passif __name__ == "__main__": main()
#!/usr/bin/env python3#coding=utf-8'''name : Parisdate : 2018-8-27email : 1546079656@qq.commodules: python3.5 mysql pymysqlThis is a dict project for AID'''from socket import *import os import pymysqlimport timeimport sys import signal DICT_TEXT = "./dict.txt"HOST = '0.0.0.0'PORT = 8000ADDR = (HOST, PORT)# 主控制流程def main(): # 连接数据库 db = pymysql.connect\ ('localhost', 'root', '123456', 'dict') # 创建流式套接字 s = socket() s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.bind(ADDR) s.listen(5) # 或略子进程退出 signal.signal(signal.SIGCHLD, signal.SIG_IGN) while True: try: c,addr = s.accept() print("Connect from", addr) except KeyboardInterrupt: s.close() sys.exit("服务器退出") except Exception as e: print(e) continue # 创建子进程处理客户端请求 pid = os.fork() if pid == 0: s.close() do_child(c,db) else: c.close()def do_child(c,db): # 循环接收请求 while True: data = c.recv(128).decode() print("Request:", data) if (not data) or data[0] == 'E': c.close() sys.exit(0) elif data[0] == 'R': do_register(c, db, data) elif data[0] == "L": do_login(c, db, data) elif data[0] == 'Q': do_query(c, db, data) elif data[0] == 'H': do_history(c, db, data)def do_register(c, db, data): l = data.split(' ') name = l[1] passwd = l[2] cursor = db.cursor() sql = \ "select * from user where name='%s'" % name cursor.execute(sql) r = cursor.fetchone() if r != None: c.send(b'EXISTS') return sql = "insert into user (name, passwd)\ values ('%s', '%s')" % (name, passwd) try: cursor.execute(sql) db.commit() c.send(b'OK') except: db.rollback() c.send(b'FALL') return else: print("%s注册成功" % name)def do_login(c, db, data): l = data.split(' ') name = l[1] passwd = l[2] cursor = db.cursor() sql = "select * from user where \ name='%s' and passwd='%s'" % (name, passwd) cursor.execute(sql) r = cursor.fetchone() if r == None: c.send('用户名或密码不正确'.encode()) else: c.send(b'OK')def do_query(c, db, data): l = data.split(' ') name = l[1] word = l[2] cursor = db.cursor() def insert_history(): tm = time.ctime() sql = "insert into hist (name, word, time)\ values ('%s', '%s', '%s')" % (name, word, tm) try: cursor.execute(sql) db.commit() except: db.rollback() return try: f = open(DICT_TEXT, 'rb') except: c.send("500 服务端异常".encode()) return while True: line = f.readline().decode() w = line.split(' ')[0] if (not line) or w > word: c.send("没找到该单词".encode()) break elif w == word: c.send(b'OK') time.sleep(0.1) c.send(line.encode()) insert_history() break f.close()def do_history(c, db, data): name = data.split(' ')[1] cursor = db.cursor() try: sql = "select * from hist \ where name='%s'" % name cursor.execute(sql) r = cursor.fetchall() if not r: c.send('没有历史记录'.encode()) return else: c.send(b'OK') except: c.send("数据库查询错误".encode()) return n = 0 for i in r: n += 1 #最多显示10条 if n > 10: break time.sleep(0.1) msg = "%s %s %s" % (i[1], i[2], i[3]) c.send(msg.encode()) time.sleep(0.1) c.send(b'##') if __name__ == "__main__": main()客户端:
#!/usr/bin/env python3#coding=utf-8from socket import *import sysimport getpassdef main(): if len(sys.argv) < 3: print("argv is error") return HOST = sys.argv[1] PORT = int(sys.argv[2]) ADDR = (HOST, PORT) s = socket() s.connect(ADDR) while True: print('''\n ===========Welcome========= --1.注册 2.登录 3.退出-- =========================== ''') try: cmd = int(input("输入选项>>")) except Exception: print("输入命令错误") continue if cmd not in [1, 2, 3]: print("对不起,没有该命令") sys.stdin.flush() # 清除输入 continue elif cmd == 1: name = do_register(s) if name != 1: print("注册成功,直接登录!") login(s, name) else: print("注册失败!") elif cmd == 2: name = do_login(s) if name != 1: print("登录成功!") login(s, name) else: print("登录失败!") elif cmd == 3: s.send(b"E") sys.exit("谢谢使用")def do_register(s): while True: name = input("用户名:") passwd = getpass.getpass("密 码:") passwd1 = getpass.getpass("确认密码:") if (' ' in name) or (' ' in passwd): print("用户名密码不允许空格") continue if passwd != passwd1: print("两次密码不一致") continue msg = "R {} {}".format(name, passwd) # 发送请求 s.send(msg.encode()) # 接收回复 data = s.recv(128).decode() if data == "OK": return name elif data == 'EXISTS': print("该用户已存在") return 1 else: return 1def do_login(s): name = input("用户名:") passwd = getpass.getpass("密 码:") msg = "L {} {}".format(name, passwd) s.send(msg.encode()) data = s.recv(128).decode() if data == 'OK': return name else: print(data) return 1def login(s, name): while True: print('''\n ===========查询界面============ 1.查词 2.历史记录 3.注销 ============================= ''') try: cmd = int(input("输入选项>>")) except Exception: print("命令错误") continue if cmd not in [1,2,3]: print("对不起,没有该命令") sys.stdin.flush() # 清除输入 continue elif cmd == 1: do_query(s, name) elif cmd == 2: do_history(s, name) elif cmd == 3: returndef do_query(s, name): while True: word = input("单词:") if word == "q": break msg = "Q {} {}".format(name, word) s.send(msg.encode()) data = s.recv(128).decode() if data == 'OK': data = s.recv(2048).decode() print(data) else: print(data)def do_history(s, name): msg = "H {}".format(name) s.send(msg.encode()) data = s.recv(128).decode() if data == 'OK': while True: data = s.recv(1024).decode() if data == "##": break print(data) else: print(data)if __name__ == "__main__": main()
转载地址:http://druyo.baihongyu.com/