龙空技术网

[Python]使用状态机实现自动交易策略

暖冬2000 89

前言:

而今你们对“pythonstmtime”大体比较珍视,同学们都需要剖析一些“pythonstmtime”的相关文章。那么小编在网摘上网罗了一些对于“pythonstmtime””的相关文章,希望我们能喜欢,姐妹们一起来了解一下吧!

状态机模型简介

状态机(State Machine)是一种计算模型,它描述了系统的行为和状态之间的转换。用通俗一些的方式来说,状态机可以被看作是一个系统的“说明书”,它描述了这个系统所有可能的状态和在状态之间转移的规则。我们可以把状态机想象成一个游戏,游戏中有不同的关卡和过关规则。玩家需要按照规则完成关卡,才能进入下一个关卡。

在状态机中,一个系统可以处于多个状态中的一个状态。而系统状态的转移则表示从一个状态转移到另一个状态的过程。这个过程通常是据特定条件或事件触发的,就像在游戏中通过完成任务或收集道具进入下一个关卡一样。

状态机在计算机网络、通信、自动控制中应用广泛。使用状态机模型实现的程序与过程化的程序相比有很多优点。状态机程序具有非常清晰的结构,它通过状态和转移描述了系统的行为,因此,状态机程序的逻辑更容易实现,相比而言,过程化程序则是基于过程和函数的执行,需要通过条件语句和控制结构来实现相同的逻辑。所以,状态机程序的状态和转移非常容易理解,因此代码的具有更好的可读性和可维护性。

一个日内冲高回落卖出再回落买回的示例,原理

我们用一个冲高回落卖出再回落买回的自动化交易策略看看是如何使用状态机模型来实现的。我们先对冲高回落卖出做个定义:以初始价(例如昨日收盘价)为基准,当涨幅超过阈值1后,当价格从最高点回落超过阈值2后,按比例卖出持仓证券,当卖出成功后股价回落幅度超过阈值,补回仓位。

在这个过程中有五个主要状态,

状态s0:初始状态,该状态为等待上涨状态,即按当前价格计算的涨幅未达到涨幅条件即阈值1. 系统该状态下,根据股价计算涨幅,并进行判断,如果涨幅超过阈值,则进入状态s1; 否则,保持状态不变。状态s1: 该状态为冲高待卖出状态。该状态下,系统根据当前股价进行如下操作,当股价大于最高价时,将更新最高价为当前股价。当股价低于最高价时,如果股价与最高价的比值低于阈值2,则下单卖出,状态变为s2; 如果高于阈值2,则状态不变。状态s2: 该状态为冲高回落已卖出状态。该状态下,如果委托全部成交,进入状态s3,否则,状态不变。状态s3: 该状态为等待回落状态。该状态下,系统跟踪股价与卖出价的比值,当比值低于阈值3,即从高点回落超过阈值,进入状态s4.状态s4: 该状态为回落后待反弹买回状态。该状态下根据股价更新从本状态开始后的最低价。当股价与最低价比值高于阈值4时,下单买回原仓位,进入状态S5.状态s5: 该状态为回落卖出待确认状态。该状态下,如果委托全部成交,进入状态s0,否则,状态不变。

状态之间的转移条件都非常简单,状态转移图我就不(懒得)画了。

代码示例

接下来是python代码,代码是由从本人实际运行代码删减而来,缺少相关的函数和环境,无法直接运行。为了便于阅读理解,代码中还删除了一些重要的细节处理部分,因此代码仅供交流学习,不要直接使用。

状态机类StateMachine源代码:

import timefrom autotrader.trader import Traderimport logging'''自动交易状态机'''class StateMachine:    strategy_params = None    '''    突破卖出策略状态机    '''    def __init__(self,code:str,postion:int,trader:Trader,logger=None):        '''        初始化函数        :code: 证券代码        :postion: 初始持仓        :trader: 交易接口实例        '''        self.code = code        self.position = postion        self.trader = trader        self.stmFunc = self.s0        self.strategy = None        self.counter = {}        self._observe_price = -1        self.basePrice = -1        self.currHighPrice = -1        self.orders = []        self.messages = []        self.currprice = 0        self.yesterday_close = 0        if logger:            self.logger = logger        else:            self.logger = logging.getLogger("autotrader")   		'''		quoteData:行情报价    '''    def process(self,quoteData):        try:            if quoteData and self.code in quoteData:                data = quoteData[self.code]                if not self.yesterday_close:                    self.yesterday_close = data["yesterday_close"]                                if data["price"]>0:                     if not self.strategy:#初始化策略参数                        strategy1 = {}                        for param in StateMachine.strategy_params["default"]:                            if in_range(data["price"],param["price"]) and in_range(data["premium_rate"],param["premium_rate"]):                                strategy1 = param                                break                                                strategy2 = {}                        if self.code in StateMachine.strategy_params["params"]:                            strategy2 = StateMachine.strategy_params["params"][self.code]                        self.strategy = {**strategy1,**strategy2}                        if not self.strategy and self.stmFunc!=self.s_exception:                                self.writeLog(logging.INFO,"没有符合条件的策略!",0,0)                                self.stmFunc = self.s_exception                    self.currQuote = data                    self.stmFunc = self.stmFunc(quoteData)                    messages = self.messages                    self.messages = []                    return messages            return []        except Exception as e:            self.stmFunc = self.s_exception            traceback.print_exc()            return ["异常:{}".format(str(e))]        def resetCounter(self,counterid):        initialCount = {"s2":60,"s5":60}        self.counter[counterid] = initialCount[counterid]        def decCount(self,counterid):        self.counter[counterid]-=1        return self.counter[counterid]        def s0(self,quoteData):        '''        状态s0: 初始状态        :quoteData: 行情数据        :返回: 更新的状态机状态        '''        data =  quoteData[self.code]        currPrice = data["price"]        if self.basePrice<0:            self.basePrice = data["yesterday_close"]        if self.observePrice <=0:            self.observePrice = self.basePrice*(1+self.strategy["thresh_high"])                if currPrice >=self.observePrice:            percent = currPrice/self.basePrice - 1            mess = "价格{:.2f}({:.2%})超过目标价".format(currPrice,percent)            print(mess)                        self.writeLog(logging.INFO,mess,self.observePrice,data["price"])            self.observePrice = self.currHighPrice - (self.currHighPrice - self.basePrice)*self.strategy["thresh_sell"]            return self.s1                return self.s0        def s1(self,quoteData):        '''        状态s1: 突破状态        :参数        :quoteData: 行情数据        :返回: 更新的状态机状态        '''        data =  quoteData[self.code]        currPrice = data["price"]                if currPrice>self.currHighPrice:             self.currHighPrice = currPrice            self.observePrice = currPrice - (currPrice - self.basePrice)*self.strategy["thresh_sell"]            return self.s1                if currPrice<=self.observePrice:            quantity = max(self.position - 100,100)            orderprice = currPrice - 0.5            orderId = self.trader.sell(self.code,orderprice,quantity,False)                        if orderId:                order = {"orderId":orderId,"order_quantity":quantity,"price":orderprice}                self.orders.append(order)                self.resetCounter("s2")                                print("{code}从最高{high:.2f}回落低于{thresh:.2f},卖出[价格:{price:.2f},数量:{quantity},订单号:{orderId}]".format(code=self.code,high=self.currHighPrice,thresh=self.observePrice,price=orderprice,quantity=quantity,orderId=orderId))                self.logger.info("{code}从最高{high:.2f}回落低于{thresh:.2f},卖出[价格:{price:.2f},数量:{quantity},订单号:{orderId}]".format(code=self.code,high=self.currHighPrice,thresh=self.observePrice,price=orderprice,quantity=quantity,orderId=orderId))                mess = "回落卖出,价格:{price:.2f},数量:{quantity}".format(price=orderprice,quantity=quantity)                self.writeLog(logging.INFO,mess,self.observePrice,data["price"])                self.observePrice = (self.currHighPrice-self.basePrice)*self.strategy["thresh_fall"]+self.basePrice                return self.s2            else:                print("下单异常,转入异常状态.")                return self.s_exception                return self.s1        def s2(self,quoteData):        '''        状态s2: 回落卖出待成交        :参数        :quoteData: 行情数据        :返回: 更新的状态机状态        '''                if self.decCount("s2")==0:            self.resetCounter("s2")            order = self.orders[-1]            deal = self.trader.getDeal(order["orderId"])            if deal and deal["finished_quantity"] == order["order_quantity"]:                self.writeLog(logging.INFO,"委托已成交",self.observePrice,deal["price"])                order.update(deal)                return self.s3                return self.s2    def s3(self,quoteData):        '''        状态s3: 回落等待向下突破        :参数        :quoteData: 行情数据        :返回: 更新的状态机状态        '''                data =  quoteData[self.code]                if data["price"]<=self.observePrice:            self.writeLog(logging.INFO,"价格向下突破阈值",self.observePrice,data["price"])            self.observePrice = data["price"]*(1+self.strategy["thresh_buy"])            return self.s4                return self.s3    def s4(self,quoteData):        '''        状态s4: 回落突破        :参数        :quoteData: 行情数据        :返回: 更新的状态机状态        '''                data =  quoteData[self.code]        thresh = self.strategy["thresh_buy"]        currPrice = data["price"]        if currPrice>self.observePrice:            quantity = max(self.position - 10,10)            sellorder = self.orders[-1]            orderprice = currPrice/(1+self.strategy["thresh_buy"])            orderId = self.trader.buy(self.code,orderprice,quantity)            if orderId:                order = {"orderId":orderId,"order_quantity":quantity,"price":orderprice}                self.orders.append(order)                self.resetCounter("s5")                print("{code}探底回升超过{thresh:.2f},买回[价格:{price:.2f},数量:{quantity},订单号:{orderId}]".format(code=self.code,thresh=self.observePrice,price=orderprice,quantity=quantity,orderId=orderId))                self.logger.info("{}探底回升超过{thresh:.2%},买回[价格:{price:.2f},数量:{quantity},订单号:{orderId}]".format(self.code,thresh=self.observePrice,price=orderprice,quantity=quantity,orderId=orderId))                self.writeLog(logging.INFO,"探底回升超过阈值,买入",self.observePrice,orderprice)                return self.s5            else:                print("下单异常,转入异常状态.")                self.writeLog(logging.INFO,"探底回升超过阈值,买入下单异常",self.observePrice,orderprice)                return self.s_exception                observprice = data["price"]*(1+thresh)        if observprice<self.observePrice:            self.observePrice = observprice                return self.s4        def s5(self,quoteData):        '''        状态s4: 回落买回待成交状态        :参数        :quoteData: 行情数据        :返回: 更新的状态机状态        '''                if self.decCount("s5")==0:            self.resetCounter("s5")            order = self.orders[-1]            deal = self.trader.getOrder(order["orderId"])            if deal and deal["finished_quantity"] == order["order_quantity"]:                order.update(deal)                self.basePrice = order["price"]                self.currHighPrice = -1                self.observePrice = -1                return self.s0                return self.s5     def s_exception(self,quoteData):        '''            出现不可处理的异常,进入该状态        '''        return self.s_exception    '''			以下都是些辅助函数    '''    def __str__(self):        return "状态{}:{}".format(self.stmFunc.__name__,self.state_literal())        def state_literal(self):        statusName = {            "s0":"初始",            "s1":"向上突破",            "s2":"回落卖出(待成)",            "s3":"回落卖出(已成)",            "s4":"向下突破",            "s5":"止跌买回(待成)",            "s_exception":"异常"        }        return statusName[self.stmFunc.__name__]    def state(self):        return self.stmFunc.__name__        def writeLog(self,level,message,obeservePrice,price):        strtime = time.strftime("%H:%M:%S")        self.logger.log(level,message)        self.log.append({"message":message,"level":level,"time":strtime,"price":price,"obeservePrice":obeservePrice,})         @property    def observePrice(self):        return self._observe_price    @observePrice.setter    def observePrice(self,price):        if price!=self._observe_price:            print("{}的目标价设置为:{:.2f}({})".format(self.code,price,self.state()))            self._observe_price = price        return self._observe_price

在主程序中为每个执行此策略的证券标的创建一个StateMachine类,在主循环中获取行情数据,调用StatMachine的process函数即可,具体代码这里就省略了。

以上就是一个简单的日内交易状态机模型。之前写过基于过程的同样功能的程序,里面的逻辑判断很多,程序易读性差,修改起来也很麻烦,后来就改成了状态机模型,程序清爽多了,维护起来也很方便。

标签: #pythonstmtime