#!/usr/bin/python # -*- coding: UTF-8 -*- """ 使用自建的接口识别来自网络的验证码 需要配置参数: remote_url = "https://www.xxxxxxx.com/getImg" 验证码链接地址 rec_times = 1 识别的次数 """ import datetime import requests from io import BytesIO import time import json import os from flask import Flask, request, Response from log_ware import LogWare logger = LogWare().get_logger() class RecognizeOnlineError(Exception): pass # Flask对象 app = Flask(__name__) basedir = os.path.abspath(os.path.dirname(__file__)) # 客户端连接传递的cookie jsession_id = '' with open("conf/sample_config.json", "r") as f: sample_conf = json.load(f) # 配置参数 env = sample_conf["env"] # 环境 remote_url = sample_conf["remote_url"] # 网络验证码地址 image_suffix = sample_conf["image_suffix"] # 文件后缀 online_save_path = sample_conf["online_image_dir"] # 远从远程验证码url获取的待识别图片,识别后保存图片的路径 image_suffix = sample_conf["image_suffix"] # 文件后缀 webserver_recognize_url = sample_conf['webserver_recognize_url'] # 识别服务器IP webserver_recognize_port = sample_conf['webserver_recognize_port'] # 识别服务器端口 request_recognize_ip = sample_conf['request_recognize_ip'] # 识别服务器IP,供外部程序调用 request_recognize_port = sample_conf['request_recognize_port'] # 识别服务器端口 def recognize_captcha(jsession_id, remote_url, rec_times, save_path, image_suffix): image_file_name = 'captcha.{}'.format(image_suffix) # 根据实际需求配置headers headers = { # 'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36", "Host": "app.singlewindow.cn", 'Referer': 'https://app.singlewindow.cn/ceb2pubweb/sw/personalAmount', 'Cookie': "jsessionid={}".format(jsession_id) } for index in range(rec_times): # 请求 while True: try: req_url = "{}?timeStamp={}".format(remote_url, int(round(time.time() * 1000))) response = requests.request("GET", req_url, headers=headers, timeout=6) if response.text: break else: logger.warn("retry, response.text is empty") except Exception as ee: logger.error(ee) # 识别 s = time.time() url = "http://{}:{}/b".format(webserver_recognize_url, str(webserver_recognize_port)) files = {'image_file': (image_file_name, BytesIO(response.content), 'application')} r = requests.post(url=url, files=files) e = time.time() # 识别结果 logger.debug("远程下载图片,调用本地识别服务,接口响应: %s", r.text) predict_text = json.loads(r.text)["value"] now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') logger.debug("【%s】 index:%d 耗时:%s ms 预测结果:%s", now_time, index, int((e - s) * 1000), predict_text) if env.lower() == 'dev': # 保存文件 img_name = "{}_{}.{}".format(predict_text, str(time.time()).replace(".", ""), image_suffix) path = os.path.join(save_path, img_name) with open(path, "wb") as f: f.write(response.content) logger.debug("============== online recognized end ==============") return predict_text def response_headers(content): resp = Response(content) resp.headers['Access-Control-Allow-Origin'] = '*' return resp @app.route('/rec', methods=['GET', 'POST']) def request_recongnize(): if (request.method == 'POST' or request.method == 'GET'): if (request.method == 'POST' and request.form['jsessionid']): jsession_id = request.form['jsessionid'] elif (request.method == 'GET' and request.args.get('jsessionid')): jsession_id = request.args.get('jsessionid') else: logger.debug("缺少请求参数jsessionid") content = json.dumps({"error_code": "1002", "error_msg": "缺少请求参数jsessionid"}) resp = response_headers(content) return resp rec_times = 1 captcha_text = recognize_captcha(jsession_id, remote_url, rec_times, online_save_path, image_suffix) content = json.dumps({"captcha_text": captcha_text, "jsessionid": jsession_id}) logger.debug("返回验证码:%s,请求jsessionid:%s", captcha_text, jsession_id) else: content = json.dumps({"error_code": "1000", "error_msg": "只能是GET,POST请求"}) resp = response_headers(content) return resp if __name__ == '__main__': app.run( host=request_recognize_ip, port=request_recognize_port, debug=True )