-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalysis.py
More file actions
126 lines (106 loc) · 5.85 KB
/
analysis.py
File metadata and controls
126 lines (106 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import polars as pl
import os
class RawConverter:
def __init__(self, rawfile, outputfile,loadcell_max_lbf,loggertype):
self.rawfile = rawfile
self.outputfile = outputfile
self.loadcell_max_lbf = loadcell_max_lbf
self.loggertype = loggertype
# --- 定数定義 ---
self.VOLTAGE_REF_5V = 4.99
self.R0 = 10000
self.T0 = 25 + 273.15
self.Tempnew_B = 4126
self.Temp_B = 3435
self.HighTemp_R = 10000
self.LowTemp_R = 1000
# --- 計算ロジック(エクスプレッションを返す関数) ---
# これにより、可読性とオブジェクト指向的なモジュール性を維持します
def adc_voltage(self,col_name):
"""バイナリ値を電圧に変換する計算式"""
return (self.VOLTAGE_REF_5V * pl.col(col_name)) / 4095.0
def thrust(self,col_name, max_lbf):
"""推力の計算式を返す"""
if max_lbf == 250:
return 0.5807 * pl.col(col_name) - 24.098
elif max_lbf == 500:
return 1.2836 * pl.col(col_name) - 49.93
else: # 1000
return 2.3896 * pl.col(col_name) - 115.47
def pressure(self,col_name):
"""圧力の計算式を返す"""
v = self.adc_voltage(col_name)
return 2.0 * v * 1e6
def temp(self,col_name, R, B):
"""サーミスタ温度の計算式を返す"""
v = self.adc_voltage(col_name)
# ゼロ除算回避(vがVOLTAGE_REF_5Vに近い場合)
v_safe = pl.when(v >= self.VOLTAGE_REF_5V).then(self.VOLTAGE_REF_5V - 1e-6).otherwise(v)
rt = R * v_safe / (self.VOLTAGE_REF_5V - v_safe)
# サーミスタ計算式
# T = 1 / (1/B * ln(Rt/R0) + 1/T0) - 273.15
t_kelvin = 1 / ((1 / B) * (rt / self.R0).log() + (1 / self.T0))
return t_kelvin - 273.15
# --- メイン処理 ---
def convert(self):
if self.loggertype == "new":
self.convert_new()
elif self.loggertype == "old":
self.convert_old()
def create_light(self):
out_dir = os.path.dirname(self.outputfile)
light_rawfile = os.path.join(out_dir, "LOG_light.csv")
has_hdr = (self.loggertype == "new")
df = pl.read_csv(self.rawfile, has_header=has_hdr, ignore_errors=True)
df_light = df.gather_every(100)
df_light.write_csv(light_rawfile, include_bom=True)
def convert_old(self):
# 1. 読み込み(必要な列だけインデックスで指定)
# 元のコードの row[0], row[1], ... に対応
col_names = [f"column_{i}" for i in range(20)] # 一旦仮名
# scan_csvを使うと「遅延評価」になり、さらに最適化されます
df = pl.read_csv(self.rawfile, has_header=False, new_columns=col_names)
# 2. 変換処理(ここですべての計算を一気に定義)
# Polarsはこれを解析してマルチスレッドで並列実行します
df_result = df.select([
pl.col("column_0").alias("書き込み待ちデータ数"),
pl.col("column_1").alias("データ取得開始時"),
pl.col("column_2").alias("データ取得終了時"),
self.thrust("column_3", self.loadcell_max_lbf).alias("推力[N]"),
self.pressure("column_5").alias("圧力1[Pa]"),
self.pressure("column_6").alias("圧力2[Pa]"),
self.pressure("column_7").alias("圧力3[Pa]"),
self.pressure("column_8").alias("圧力4[Pa]"),
self.temp("column_11", self.LowTemp_R, self.Temp_B).alias("低域温度1[℃]"),
self.temp("column_12", self.LowTemp_R, self.Temp_B).alias("低域温度2[℃]"),
self.temp("column_13", self.LowTemp_R, self.Tempnew_B).alias("低域温度3[℃]"),
self.temp("column_14", self.HighTemp_R, self.Temp_B).alias("高域温度1[℃]"),
self.temp("column_15", self.HighTemp_R, self.Temp_B).alias("高域温度2[℃]"),
pl.col("column_19").alias("バルブ")
])
# 3. 書き出し
df_result.write_csv(self.outputfile, include_bom=True)
def convert_new(self):
# 1. 読み込み(必要な列だけインデックスで指定)
col_names = [f"column_{i}" for i in range(18)] # 一旦仮名
# scan_csvを使うと「遅延評価」になり、さらに最適化されます
df = pl.read_csv(self.rawfile, has_header=True, new_columns=col_names)
# 2. 変換処理(ここですべての計算を一気に定義)
# Polarsはこれを解析してマルチスレッドで並列実行します
df_result = df.select([
pl.col("column_0").alias("データ取得開始時"),
pl.col("column_1").alias("データ取得終了時"),
self.thrust("column_2", self.loadcell_max_lbf).alias("推力[N]"),
self.pressure("column_4").alias("圧力1[Pa]"),
self.pressure("column_5").alias("圧力2[Pa]"),
self.pressure("column_6").alias("圧力3[Pa]"),
self.pressure("column_7").alias("圧力4[Pa]"),
self.temp("column_10", self.HighTemp_R, self.Temp_B).alias("高域温度1[℃]"),
self.temp("column_11", self.HighTemp_R, self.Temp_B).alias("高域温度2[℃]"),
self.temp("column_12", self.HighTemp_R, self.Temp_B).alias("高域温度3[℃]"),
self.temp("column_13", self.LowTemp_R, self.Temp_B).alias("低域温度1[℃]"),
self.temp("column_14", self.LowTemp_R, self.Temp_B).alias("低域温度2[℃]"),
self.temp("column_15", self.LowTemp_R, self.Temp_B).alias("低域温度3[℃]")
])
# 3. 書き出し
df_result.write_csv(self.outputfile, include_bom=True)