You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

371 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import _thread
import queue
import time
import multiprocessing as mp
from multiprocessing import Process # abc
from multiprocessing import Value, Manager
import tensorflow as tf # abc
import numpy as np # abc
from captcha.image import ImageCaptcha # abc
from PIL import Image
import random
import matplotlib.pyplot as plt
import os
import threading
import datetime
number = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
'u',
'v', 'w', 'x', 'y', 'z']
ALPHABET = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U',
'V', 'W', 'X', 'Y', 'Z']
IMAGE_HEIGHT=60
IMAGE_WIDTH =160
MAX_CAPTCHA =4
CHAR_SET_LEN=63
import sys
class Logger(object):
def __init__(self, filename="log.txt"):
self.terminal = sys.stdout
self.log = open(filename, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
self.log.flush()
def random_captcha_text(char_set=number + alphabet + ALPHABET, captcha_size=4):
captcha_text = []
for i in range(captcha_size):
c = random.choice(char_set)
captcha_text.append(c)
return captcha_text
def gen_captcha_text_and_image(i=0):
# 创建图像实例对象
image = ImageCaptcha()
# 随机选择4个字符
captcha_text = random_captcha_text()
# array 转化为 string
captcha_text = ''.join(captcha_text)
# 生成验证码
captcha = image.generate(captcha_text)
if i % 100 == 0:
image.write(captcha_text, "D:\\DL\\captcha pics" + captcha_text + '.jpg')
captcha_image = Image.open(captcha)
captcha_image = np.array(captcha_image)
return captcha_text, captcha_image
def convert2gray(img):
if len(img.shape) > 2:
gray = np.mean(img, -1)
return gray
else:
return img
# 文本转向量
def text2vec(text):
text_len = len(text)
if text_len > MAX_CAPTCHA:
raise ValueError('验证码最长4个字符')
vector = np.zeros(MAX_CAPTCHA * CHAR_SET_LEN)
def char2pos(c):
if c == '_':
k = 62
return k
k = ord(c) - 48
if k > 9:
k = ord(c) - 55
if k > 35:
k = ord(c) - 61
if k > 61:
raise ValueError('No Map')
return k
for i, c in enumerate(text):
idx = i * CHAR_SET_LEN + char2pos(c)
vector[idx] = 1
return vector
# 向量转回文本
def vec2text(vec):
char_pos = vec[0]
text = []
for i, c in enumerate(char_pos):
char_idx = c % CHAR_SET_LEN
if char_idx < 10:
char_code = char_idx + ord('0')
elif char_idx < 36:
char_code = char_idx - 10 + ord('A')
elif char_idx < 62:
char_code = char_idx - 36 + ord('a')
elif char_idx == 62:
char_code = ord('_')
else:
raise ValueError('error')
text.append(chr(char_code))
return "".join(text)
# 生成一个训练batch
def get_next_batchDetail(batch_size=64):
re=[]
for loop in range(20):
global IMAGE_HEIGHT
global IMAGE_WIDTH
global MAX_CAPTCHA
global CHAR_SET_LEN
batch_x = np.zeros([batch_size, IMAGE_HEIGHT * IMAGE_WIDTH])
batch_y = np.zeros([batch_size, MAX_CAPTCHA * CHAR_SET_LEN])
def wrap_gen_captcha_text_and_image(i):
while True:
text, image = gen_captcha_text_and_image(i)
if image.shape == (60, 160, 3):
return text, image
else:
print("fail")
for i in range(batch_size):
text, image = wrap_gen_captcha_text_and_image(i)
image = convert2gray(image)
batch_x[i, :] = image.flatten() / 255
batch_y[i, :] = text2vec(text)
re.append((batch_x, batch_y))
return re
def spawn_n_processes(size):
global batchLake
while True:
while(len(batchLake[size])>1000):
time.sleep(1)
re=[]
print("batch:" + str(size) + "池大小" + str(len(batchLake[size])))
pool=mp.Pool(processes=12)
for loop in range(12):
re.append(pool.apply_async(get_next_batchDetail,[size,]))
pool.close()
pool.join()
for i in range(len(re)):
batchLake[size]=batchLake[size]+re[i].get()
def get_next_batch(batch_size=64):
startTime = datetime.datetime.now()
global batchLake
if(batch_size not in batchLake.keys()):
batchLake[batch_size] =[]
print(batchLake[batch_size])
try:
_thread.start_new_thread ( spawn_n_processes, (batch_size,) )
except Exception:
print(Exception)
leng=len(batchLake[batch_size])
while(leng<=0):
leng = len(batchLake[batch_size])
time.sleep(0.1)
#print("part1耗时:" + str(datetime.datetime.now() - startTime))
temp=batchLake[batch_size][0]
del batchLake[batch_size][0]
print("获取batch耗时:"+str(datetime.datetime.now() - startTime))
print("batch池"+str(batch_size)+"余量:"+str(leng-1))
return temp[0],temp[1]
# 定义CNN
def crack_captcha_cnn(w_alpha=0.01, b_alpha=0.1):
x = tf.reshape(X, shape=[-1, IMAGE_HEIGHT, IMAGE_WIDTH, 1])
w_c1 = tf.Variable(w_alpha * tf.random_normal([3, 3, 1, 32]))
b_c1 = tf.Variable(b_alpha * tf.random_normal([32]))
conv1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(x, w_c1, strides=[1, 1, 1, 1], padding='SAME'), b_c1))
conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv1 = tf.nn.dropout(conv1, keep_prob=keep_prob)
w_c2 = tf.Variable(w_alpha * tf.random_normal([3, 3, 32, 64]))
b_c2 = tf.Variable(b_alpha * tf.random_normal([64]))
conv2 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv1, w_c2, strides=[1, 1, 1, 1], padding='SAME'), b_c2))
conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv2 = tf.nn.dropout(conv2, keep_prob=keep_prob)
w_c3 = tf.Variable(w_alpha * tf.random_normal([3, 3, 64, 64]))
b_c3 = tf.Variable(b_alpha * tf.random_normal([64]))
conv3 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv2, w_c3, strides=[1, 1, 1, 1], padding='SAME'), b_c3))
conv3 = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
conv3 = tf.nn.dropout(conv3, keep_prob=keep_prob)
w_d = tf.Variable(w_alpha * tf.random_normal([8 * 20 * 64, 1024]))
b_d = tf.Variable(b_alpha * tf.random_normal([1024]))
dense = tf.reshape(conv3, [-1, w_d.get_shape().as_list()[0]])
dense = tf.nn.relu(tf.add(tf.matmul(dense, w_d), b_d))
dense = tf.nn.dropout(dense, keep_prob=keep_prob)
w_out = tf.Variable(w_alpha * tf.random_normal([1024, MAX_CAPTCHA * CHAR_SET_LEN]))
b_out = tf.Variable(b_alpha * tf.random_normal([MAX_CAPTCHA * CHAR_SET_LEN]))
out = tf.add(tf.matmul(dense, w_out), b_out)
return out
# 训练
def train_crack_captcha_cnn():
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
# 具体的代码
output = crack_captcha_cnn()
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=output, labels=Y)) # 计算损失
optimizer = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss) # 计算梯度
predict = tf.reshape(output, [-1, MAX_CAPTCHA, CHAR_SET_LEN]) # 目标预测
max_idx_p = tf.argmax(predict, 2) # 目标预测最大值
max_idx_l = tf.argmax(tf.reshape(Y, [-1, MAX_CAPTCHA, CHAR_SET_LEN]), 2) # 真实标签最大值
correct_pred = tf.equal(max_idx_p, max_idx_l)
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32)) # 准确率
saver = tf.train.Saver()
with tf.Session(config=config) as sess:
sess.run(tf.global_variables_initializer())
step = 0
while True:
batch_x, batch_y = get_next_batch(64)
_, loss_ = sess.run([optimizer, loss], feed_dict={X: batch_x, Y: batch_y, keep_prob: 0.75})
print(step, loss_)
if step % 100 == 0 and step!=0 :
batch_x_test, batch_y_test = get_next_batch(100)
acc = sess.run(accuracy, feed_dict={X: batch_x_test, Y: batch_y_test, keep_prob: 1.})
print(step, "准确率:",acc)
if acc > 0.93:
saver.save(sess, "D:\\DL\\models\\85", global_step=step)
step += 1
def crack_captcha(captcha_image, output):
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
# 获取训练后的参数
checkpoint = tf.train.get_checkpoint_state("D:\\DL\\models")
if checkpoint and checkpoint.model_checkpoint_path:
saver.restore(sess, checkpoint.model_checkpoint_path)
print("Successfully loaded:", checkpoint.model_checkpoint_path)
else:
print("Could not find old network weights")
predict = tf.argmax(tf.reshape(output, [-1, MAX_CAPTCHA, CHAR_SET_LEN]), 2)
text_list = sess.run(predict, feed_dict={X: [captcha_image], keep_prob: 1})
text = vec2text(text_list)
return text
if __name__ == '__main__':
#global IMAGE_HEIGHT
#global IMAGE_WIDTH
#global MAX_CAPTCHA
#global CHAR_SET_LEN
#global number
#global alphabet
#global ALPHABET
#global batchLake
train = 0 # 0: 训练 1: 预测
path = os.path.abspath(os.path.dirname(__file__))
type = sys.getfilesystemencoding()
sys.stdout = Logger()
batchLake={}
if train == 0:
number = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
'u',
'v', 'w', 'x', 'y', 'z']
ALPHABET = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U',
'V', 'W', 'X', 'Y', 'Z']
text, image = gen_captcha_text_and_image()
print("验证码图像channel:", image.shape)
# 图像大小
IMAGE_HEIGHT = 60
IMAGE_WIDTH = 160
MAX_CAPTCHA = len(text)
print("验证码文本最长字符数", MAX_CAPTCHA)
# 文本转向量
char_set = number + alphabet + ALPHABET + ['_'] # 如果验证码长度小于4, '_'用来补齐
CHAR_SET_LEN = len(char_set)
# placeholder占位符
X = tf.placeholder(tf.float32, [None, IMAGE_HEIGHT * IMAGE_WIDTH])
Y = tf.placeholder(tf.float32, [None, MAX_CAPTCHA * CHAR_SET_LEN])
keep_prob = tf.placeholder(tf.float32)
train_crack_captcha_cnn()
# 预测时需要将训练的变量初始化
if train == 1:
# 自然计数
step = 0
# 正确预测计数
rightCnt = 0
# 设置测试次数
count = 10
number = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
'u', 'v', 'w', 'x', 'y', 'z']
ALPHABET = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z']
IMAGE_HEIGHT = 60
IMAGE_WIDTH = 160
char_set = number + alphabet + ALPHABET + ['_']
CHAR_SET_LEN = len(char_set)
MAX_CAPTCHA = 4
# placeholder占位符
X = tf.placeholder(tf.float32, [None, IMAGE_HEIGHT * IMAGE_WIDTH])
Y = tf.placeholder(tf.float32, [None, MAX_CAPTCHA * CHAR_SET_LEN])
keep_prob = tf.placeholder(tf.float32)
output = crack_captcha_cnn()
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
saver = tf.train.Saver()
with tf.Session(config=config) as sess:
sess.run(tf.global_variables_initializer())
# 获取训练后参数路径
checkpoint = tf.train.get_checkpoint_state("D:\\DL\\models")
if checkpoint and checkpoint.model_checkpoint_path:
saver.restore(sess, checkpoint.model_checkpoint_path)
print("Successfully loaded:", checkpoint.model_checkpoint_path)
else:
print("Could not find old network weights.")
while True:
text, image = gen_captcha_text_and_image()
f = plt.figure()
ax = f.add_subplot(111)
ax.text(0.1, 0.9,text, ha='center', va='center', transform=ax.transAxes)
plt.imshow(image)
plt.show()
image = convert2gray(image)
image = image.flatten() / 255
predict = tf.argmax(tf.reshape(output, [-1, MAX_CAPTCHA, CHAR_SET_LEN]), 2)
text_list = sess.run(predict, feed_dict={X: [image], keep_prob: 1})
predict_text = vec2text(text_list)
predict_text = crack_captcha(image, output)
print("step:{} 真实值: {} 预测: {} 预测结果: {}".format(str(step), text, predict_text,
"正确" if text.lower() == predict_text.lower() else "错误"))
if text.lower() == predict_text.lower():
rightCnt += 1
if step == count - 1:
print("测试总数: {} 测试准确率: {}".format(str(count), str(rightCnt / count)))
break
step += 1