python – 将来自Twisted`enterprise.adbapi`的查询添加到`twistd`守护进程创建的reactor循环中
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python – 将来自Twisted`enterprise.adbapi`的查询添加到`twistd`守护进程创建的reactor循环中,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3531字,纯文字阅读大概需要6分钟。
内容图文
![python – 将来自Twisted`enterprise.adbapi`的查询添加到`twistd`守护进程创建的reactor循环中](/upload/InfoBanner/zyjiaocheng/964/9fd403c960d9450992550d4c0ddf04e1.jpg)
我在Twisted .tac插件中使用twisted.enterprise.adbapi,并且发现除非调用reactor.(run),否则为aConnectionPool.runQuery(sqlQuery)等函数返回的延迟对象不会触发.如何将查询添加到twistd创建的reactor循环而不是调用reactor.run()?它是一般程序还是异步数据库API特有的?
编辑 – 附上代码:
from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer
from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
import json
import base64
from twisted.enterprise import adbapi
class StringProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
def httpRequest(url, values, headers={}, method='POST'):
agent = Agent(reactor)
d = agent.request(method,
url,
Headers(headers),
StringProducer(values)
)
def handle_response(response):
if response.code == 204:
d = defer.succeed('')
else:
class SimpleReceiver(protocol.Protocol):
def __init__(s, d):
s.buf = ''; s.d = d
def dataReceived(s, data):
s.buf += data
response = json.loads(data)
receipt = response[u'receipt']
if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
transactionID = receipt[u'original_transaction_id']
date = receipt[u'original_purchase_date']
purchaseDate = date.strip(' Etc/GMT')
print transactionID
print purchaseDate
dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user', passwd='passwd')
dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))
def finishInsert(dObject, pool):
print 'inserted!'
pool.close()
dOperation.addCallback(finishInsert, dbpool)
def insertError(dObject):
print 'insert error!'
dOperation.addErrback(insertError)
def connectionLost(s, reason):
s.d.callback(s.buf)
d = defer.Deferred()
response.deliverBody(SimpleReceiver(d))
return d
d.addCallback(handle_response)
class StoreServer(protocol.Protocol):
def dataReceived(self, data):
a = data.split(':delimiter:')
if a[0] == 'addToUserList':
receiptBase64 = base64.standard_b64encode(a[1])
jsonReceipt = json.dumps({'receipt-data':receiptBase64})
httpRequest(
"https://buy.itunes.apple.com/verifyReceipt",
jsonReceipt,
{'Content-Type': ['application/x-www-form-urlencoded']}
)
application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)
解决方法:
您的代码为它发出的每个请求创建一个新的ConnectionPool.新的ConnectionPool创建自己的新线程池以运行查询,并且必须建立与数据库的新连接.
这意味着您实际上没有连接池.您只需创建并使用一次连接即可.此外,errback,insertError,不会关闭池.
这些结合在一起意味着可以一次创建的线程/连接数没有限制,除了系统对可以分配的内存量或者可以打开多少个套接字所施加的限制.当你遇到其中一个限制时,事情就不会很好了.
它还意味着每个查询错误都会泄漏一些线程和连接(ConnectionPool在启动时会设置3个线程/连接).在出现足够的错误后,您将无法再创建任何线程或连接,因此您将无法再查询数据库.您的查询很简单,您可能认为错误的可能性不大,但MySQL往往会在某种程度上随机断开客户端(也许您至少有点意识到这一点,因为您确实添加了errback来报告失败).
ConnectionPool的用途是创建一个(或两个,或其他一些小的固定数字),然后为所有查询重新使用它.这些问题是否与您最初观察到的问题有关,我不知道,但它们可能是您应该解决的问题.
内容总结
以上是互联网集市为您收集整理的python – 将来自Twisted`enterprise.adbapi`的查询添加到`twistd`守护进程创建的reactor循环中全部内容,希望文章能够帮你解决python – 将来自Twisted`enterprise.adbapi`的查询添加到`twistd`守护进程创建的reactor循环中所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。