令 E 定义构成时间序列数据的 k 个唯一事件集。例如,时间序列可能由以下三个基本且唯一的事件组成,这些事件表示在离散时间步长上绘制数据时观察到的路径轨迹类型:向下、持平和向上。令 S 定义长度为 n(表示离散时间步长)的序列,该序列由E 中定义的事件组成,表示部分或全部数据。例如,序列 [向上、向下、向上、持平、向上] 表示五个时间步长的数据。
元素 A(i, j) 表示事件序列中某个时间步 t 的事件 i 后面跟着时间步 t+1 的事件 j 的次数; i 和 j 分别是行索引和列索引。请注意,行表示从上到下、从上到下的顺序的事件,而列从左到右表示相同的事件。例如,A 的左上角元素表示在给定的事件序列中,上事件后紧跟着另一个上事件两次。 A 的中右元素表示在事件序列中,平事件之后紧接着下事件。等等。
import pandas as pd
def get_transition_tuples(ls):
return [(ls[i-1], ls[i]) for i in range(1, len(ls))]
def get_transition_event(tup):
transition_event = 'flat'
if tup[0] < tup[1]:
transition_event = 'up'
if tup[0] > tup[1]:
transition_event = 'down'
return transition_event
ls_raw_time_series = [1, 2, -2, -1, 0, 0, 2, 2, 1, 2, 3]
ls_transitions = get_transition_tuples(ls_raw_time_series)
ls_events = [get_transition_event(tup) for tup in ls_transitions]
ls_event_transitions = get_transition_tuples(ls_events)
ls_index = ['up', 'flat', 'down']
df = pd.DataFrame(0, index=ls_index, columns=ls_index)
for i, j in ls_event_transitions:
df[j][i] += 1
df_rnorm = df.div(df.sum(axis=1), axis=0).fillna(0.00)
df_cnorm = df.div(df.sum(axis=0), axis=1).fillna(0.00)
这应该产生以下转换矩阵:
>>> df
up flat down
up 2 2 1
flat 1 0 1
down 2 0 0
>>> df_rnorm
up flat down
up 0.4 0.4 0.2
flat 0.5 0.0 0.5
down 1.0 0.0 0.0
>>> df_cnorm
up flat down
up 0.4 1.0 0.5
flat 0.2 0.0 0.5
down 0.4 0.0 0.0
可视化转移状态
def get_df_edgelist(df, ls_index):
edgelist = []
for i in ls_index:
for j in ls_index:
edgelist.append([i, j, df[j][i]])
return pd.DataFrame(edgelist, columns=['src', 'dst', 'weight'])
def edgelist_to_digraph(df_edgelist):
g = Digraph(format='jpeg')
g.attr(rankdir='LR', size='30')
g.attr('node', shape='circle')
nodelist = []
for _, row in df_edgelist.iterrows():
node1, node2, weight = [str(item) for item in row]
if node1 not in nodelist:
g.node(node1, **{'width': '1', 'height': '1'})
nodelist.append(node1)
if node2 not in nodelist:
g.node(node2, **{'width': '1', 'height': '1'})
nodelist.append(node2)
g.edge(node1, node2, label=weight)
return g
def render_graph(fname, df, ls_index):
df_edgelist = get_df_edgelist(df, ls_index)
g = edgelist_to_digraph(df_edgelist)
g.render(fname, view=True)