123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- #!/usr/bin/python
- # -*- coding: UTF-8 -*-
- """
- 使用自建的接口识别来自网络的验证码
- 需要配置参数:
- remote_url = "https://www.xxxxxxx.com/getImg" 验证码链接地址
- rec_times = 1 识别的次数
- """
- import datetime
- import requests
- from io import BytesIO
- import time
- import json
- import os
- from flask import Flask, request, Response
- from log_ware import LogWare
- logger = LogWare().get_logger()
- class RecognizeOnlineError(Exception):
- pass
- # Flask对象
- app = Flask(__name__)
- basedir = os.path.abspath(os.path.dirname(__file__))
- # 客户端连接传递的cookie
- jsession_id = ''
- with open("conf/sample_config.json", "r") as f:
- sample_conf = json.load(f)
- # 配置参数
- env = sample_conf["env"] # 环境
- remote_url = sample_conf["remote_url"] # 网络验证码地址
- image_suffix = sample_conf["image_suffix"] # 文件后缀
- online_save_path = sample_conf["online_image_dir"] # 远从远程验证码url获取的待识别图片,识别后保存图片的路径
- image_suffix = sample_conf["image_suffix"] # 文件后缀
- webserver_recognize_url = sample_conf['webserver_recognize_url'] # 识别服务器IP
- webserver_recognize_port = sample_conf['webserver_recognize_port'] # 识别服务器端口
- request_recognize_ip = sample_conf['request_recognize_ip'] # 识别服务器IP,供外部程序调用
- request_recognize_port = sample_conf['request_recognize_port'] # 识别服务器端口
- def recognize_captcha(jsession_id, remote_url, rec_times, save_path, image_suffix):
- image_file_name = 'captcha.{}'.format(image_suffix)
- # 根据实际需求配置headers
- headers = {
- # 'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36",
- "Host": "app.singlewindow.cn",
- 'Referer': 'https://app.singlewindow.cn/ceb2pubweb/sw/personalAmount',
- 'Cookie': "jsessionid={}".format(jsession_id)
- }
- for index in range(rec_times):
- # 请求
- while True:
- try:
- req_url = "{}?timeStamp={}".format(remote_url, int(round(time.time() * 1000)))
- response = requests.request("GET", req_url, headers=headers, timeout=6)
- if response.text:
- break
- else:
- logger.warn("retry, response.text is empty")
- except Exception as ee:
- logger.error(ee)
- # 识别
- s = time.time()
- url = "http://{}:{}/b".format(webserver_recognize_url, str(webserver_recognize_port))
- files = {'image_file': (image_file_name, BytesIO(response.content), 'application')}
- r = requests.post(url=url, files=files)
- e = time.time()
- # 识别结果
- logger.debug("远程下载图片,调用本地识别服务,接口响应: %s", r.text)
- predict_text = json.loads(r.text)["value"]
- now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- logger.debug("【%s】 index:%d 耗时:%s ms 预测结果:%s", now_time, index, int((e - s) * 1000), predict_text)
- if env.lower() == 'dev':
- # 保存文件
- img_name = "{}_{}.{}".format(predict_text, str(time.time()).replace(".", ""), image_suffix)
- path = os.path.join(save_path, img_name)
- with open(path, "wb") as f:
- f.write(response.content)
- logger.debug("============== online recognized end ==============")
- return predict_text
- def response_headers(content):
- resp = Response(content)
- resp.headers['Access-Control-Allow-Origin'] = '*'
- return resp
- @app.route('/rec', methods=['GET', 'POST'])
- def request_recongnize():
- if (request.method == 'POST' or request.method == 'GET'):
- if (request.method == 'POST' and request.form['jsessionid']):
- jsession_id = request.form['jsessionid']
- elif (request.method == 'GET' and request.args.get('jsessionid')):
- jsession_id = request.args.get('jsessionid')
- else:
- logger.debug("缺少请求参数jsessionid")
- content = json.dumps({"error_code": "1002", "error_msg": "缺少请求参数jsessionid"})
- resp = response_headers(content)
- return resp
- rec_times = 1
- captcha_text = recognize_captcha(jsession_id, remote_url, rec_times, online_save_path, image_suffix)
- content = json.dumps({"captcha_text": captcha_text, "jsessionid": jsession_id})
- logger.debug("返回验证码:%s,请求jsessionid:%s", captcha_text, jsession_id)
- else:
- content = json.dumps({"error_code": "1000", "error_msg": "只能是GET,POST请求"})
- resp = response_headers(content)
- return resp
- if __name__ == '__main__':
- app.run(
- host=request_recognize_ip,
- port=request_recognize_port,
- debug=True
- )
|