File size: 10,251 Bytes
d03866e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
Moirai model for anomaly detection using zero-shot forecasting.
Adapted from test_anomaly.py approach for TSB-AD framework.
"""

import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader
import warnings
warnings.filterwarnings('ignore')

from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split
from uni2ts.model.moirai import MoiraiForecast, MoiraiModule

from .base import BaseDetector
from ..utils.dataset import MoiraiWindowedDataset


class Moirai(BaseDetector):
    def __init__(self, 
                 win_size=96,
                 model_path="Salesforce/moirai-1.0-R-small",
                 num_samples=100,
                 device='cuda:0',
                 use_score=False,
                 threshold=0.5):
        """
        Initialize Moirai anomaly detector.
        
        Args:
            win_size (int): Window size for context and prediction
            model_path (str): Path to pretrained Moirai model
            num_samples (int): Number of forecast samples
            device (str): Device to run model on
            use_score (bool): Whether to use raw scores or threshold
            threshold (float): Threshold for binary classification
        """
        self.model_name = 'Moirai'
        self.win_size = win_size
        self.model_path = model_path
        self.num_samples = num_samples
        self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
        self.use_score = use_score
        self.threshold = threshold
        self.decision_scores_ = None

    def fit(self, data):
        """
        Fit the Moirai model and compute anomaly scores.
        
        Args:
            data: Input time series data (1D or 2D numpy array)
        """
        try:
            # Ensure data is in the right format
            if data.ndim == 1:
                data = data.reshape(-1, 1)
            
            print(f"Moirai: Processing data with shape {data.shape}")
            
            # Create windowed dataset following test_anomaly.py pattern
            dataset = MoiraiWindowedDataset(
                data=data, 
                win_size=self.win_size, 
                step=self.win_size,  # Non-overlapping windows
                normalize=False  # Let Moirai handle normalization
            )
            
            print(f"Moirai: Created {len(dataset)} windows")
            
            if len(dataset) == 0:
                print("Warning: No valid windows created. Data might be too short.")
                self.decision_scores_ = np.zeros(len(data))
                return
            
            # Process each window using DataLoader (similar to test_anomaly.py)
            data_loader = DataLoader(
                dataset=dataset,
                batch_size=1,
                shuffle=False,
                drop_last=False
            )
            
            all_predictions = []
            all_targets = []
            
            for i, (context, target) in enumerate(data_loader):
                # Process single window following test_anomaly.py pattern
                scores = self._process_window(context.squeeze(0).numpy(), target.squeeze(0).numpy(), i)
                all_predictions.append(scores)
                all_targets.append(target.squeeze(0).numpy())
            
            # Combine all predictions
            if all_predictions:
                # Concatenate predictions along time dimension
                combined_predictions = np.concatenate(all_predictions, axis=0)
                combined_targets = np.concatenate(all_targets, axis=0)
                
                # Compute anomaly scores as prediction error
                if combined_targets.ndim == 1:
                    anomaly_scores = (combined_targets - combined_predictions) ** 2
                else:
                    # For multivariate, use mean squared error across features
                    anomaly_scores = np.mean((combined_targets - combined_predictions) ** 2, axis=1)
                
                # Pad scores to match original data length
                self.decision_scores_ = self._pad_scores_to_original_length(
                    anomaly_scores, len(data), dataset.get_window_info()
                )
            else:
                print("Warning: No predictions generated")
                self.decision_scores_ = np.zeros(len(data))
                
        except Exception as e:
            print(f"Error in Moirai.fit(): {str(e)}")
            import traceback
            traceback.print_exc()
            self.decision_scores_ = np.zeros(len(data))

    def _process_window(self, context, target, window_index):
        """
        Process a single window following the test_anomaly.py approach.
        
        Args:
            context: Context data for the window (win_size, n_features)
            target: Target data for the window (win_size, n_features) 
            window_index: Index of the current window
            
        Returns:
            predictions: Forecasted values for the target period
        """
        try:
            # Ensure 2D shape
            if context.ndim == 1:
                context = context.reshape(-1, 1)
            if target.ndim == 1:
                target = target.reshape(-1, 1)
            
            # Combine context and target for full window (following test_anomaly.py)
            full_window = np.vstack([context, target])
            
            # Create DataFrame
            feature_df = pd.DataFrame(full_window)
            if feature_df.shape[1] == 1:
                feature_df.columns = ['target']
                target_col = 'target'
                feature_cols = []
            else:
                feature_df.columns = [f'feature_{i}' for i in range(feature_df.shape[1])]
                target_col = 'feature_0'  # Use first feature as target
                feature_cols = [f'feature_{i}' for i in range(1, feature_df.shape[1])]
            
            # Add timestamp and unique_id
            timestamp_range = pd.date_range(
                start=pd.Timestamp('2023-01-01 10:00:00'), 
                periods=len(feature_df), 
                freq='T'
            )
            feature_df.index = timestamp_range
            feature_df['unique_id'] = window_index
            
            # Create GluonTS dataset
            moirai_df = feature_df.reset_index().rename(columns={'index': 'timestamp'})
            
            if feature_cols:
                ds = PandasDataset.from_long_dataframe(
                    moirai_df,
                    target=target_col,
                    item_id="unique_id",
                    timestamp="timestamp",
                    feat_dynamic_real=feature_cols,
                )
            else:
                ds = PandasDataset.from_long_dataframe(
                    moirai_df,
                    target=target_col,
                    item_id="unique_id",
                    timestamp="timestamp",
                )
            
            # Split dataset (following test_anomaly.py)
            test_size = self.win_size
            _, test_template = split(ds, offset=-test_size)
            
            test_data = test_template.generate_instances(
                prediction_length=self.win_size,
                windows=1,
                distance=self.win_size,
                max_history=self.win_size,
            )
            
            # Create Moirai model
            model = MoiraiForecast(
                module=MoiraiModule.from_pretrained(self.model_path),
                prediction_length=self.win_size,
                context_length=self.win_size,
                patch_size="auto",
                num_samples=self.num_samples,
                target_dim=1,
                feat_dynamic_real_dim=ds.num_feat_dynamic_real,
                past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real,
            )
            
            # Create predictor and generate forecasts
            predictor = model.create_predictor(batch_size=1, device=self.device)
            forecasts = predictor.predict(test_data.input)
            forecasts = list(forecasts)
            
            # Get median prediction (following test_anomaly.py)
            predictions = np.median(forecasts[0].samples, axis=0)
            
            return predictions
            
        except Exception as e:
            print(f"Error processing window {window_index}: {str(e)}")
            # Return zeros as fallback
            return np.zeros(self.win_size)

    def _pad_scores_to_original_length(self, scores, original_length, window_info):
        """
        Pad anomaly scores to match the original data length.
        
        Args:
            scores: Computed anomaly scores from windows
            original_length: Length of the original input data
            window_info: Information about windowing strategy
            
        Returns:
            padded_scores: Scores padded to original length
        """
        padded_scores = np.zeros(original_length)
        
        win_size = window_info['win_size']
        step = window_info['step']
        
        # Fill in scores from each window
        for i, score_window in enumerate(scores.reshape(-1, win_size)):
            start_idx = i * step + win_size  # Offset by win_size (context part)
            end_idx = start_idx + win_size
            
            if end_idx <= original_length:
                padded_scores[start_idx:end_idx] = score_window
            elif start_idx < original_length:
                # Partial window at the end
                remaining = original_length - start_idx
                padded_scores[start_idx:] = score_window[:remaining]
        
        # Fill beginning (context part) with first window's average
        if len(scores) > 0:
            first_score = np.mean(scores[:win_size]) if len(scores) >= win_size else np.mean(scores)
            padded_scores[:win_size] = first_score
        
        return padded_scores

    def decision_function(self, X):
        """
        Not used for zero-shot approach, present for API consistency.
        """
        return self.decision_scores_