001/* 002 * VM-Operator 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.core.JsonProcessingException; 022import com.fasterxml.jackson.databind.node.ObjectNode; 023import java.io.IOException; 024import java.nio.file.Path; 025import java.time.Duration; 026import java.time.Instant; 027import java.util.LinkedList; 028import java.util.Queue; 029import java.util.logging.Level; 030import org.jdrupes.vmoperator.runner.qemu.Constants.ProcessName; 031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities; 032import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; 033import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown; 034import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 035import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand; 036import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent; 037import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady; 038import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; 039import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; 040import org.jgrapes.core.Channel; 041import org.jgrapes.core.Components; 042import org.jgrapes.core.Components.Timer; 043import org.jgrapes.core.annotation.Handler; 044import org.jgrapes.core.events.Stop; 045import org.jgrapes.io.events.Closed; 046import org.jgrapes.io.events.ProcessExited; 047import org.jgrapes.net.SocketIOChannel; 048import org.jgrapes.util.events.ConfigurationUpdate; 049 050/** 051 * A component that handles the communication over the Qemu monitor 052 * socket. 053 * 054 * If the log level for this class is set to fine, the messages 055 * exchanged on the monitor socket are logged. 056 */ 057@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 058public class QemuMonitor extends QemuConnector { 059 060 private int powerdownTimeout; 061 private final Queue<QmpCommand> executing = new LinkedList<>(); 062 private Instant powerdownStartedAt; 063 private Stop suspendedStop; 064 private Timer powerdownTimer; 065 private boolean powerdownConfirmed; 066 private boolean monitorReady; 067 068 /** 069 * Instantiates a new QEMU monitor. 070 * 071 * @param componentChannel the component channel 072 * @param configDir the config dir 073 * @throws IOException Signals that an I/O exception has occurred. 074 */ 075 @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic", 076 "PMD.ConstructorCallsOverridableMethod" }) 077 public QemuMonitor(Channel componentChannel, Path configDir) 078 throws IOException { 079 super(componentChannel); 080 attach(new RamController(channel())); 081 attach(new CpuController(channel())); 082 attach(new DisplayController(channel(), configDir)); 083 attach(new CdMediaController(channel())); 084 } 085 086 /** 087 * As the initial configuration of this component depends on the 088 * configuration of the {@link Runner}, it doesn't have a handler 089 * for the {@link ConfigurationUpdate} event. The values are 090 * forwarded from the {@link Runner} instead. 091 * 092 * @param socketPath the socket path 093 * @param powerdownTimeout 094 */ 095 /* default */ void configure(Path socketPath, int powerdownTimeout) { 096 super.configure(socketPath); 097 this.powerdownTimeout = powerdownTimeout; 098 } 099 100 /** 101 * When the socket is connected, send the capabilities command. 102 */ 103 @Override 104 protected void socketConnected() { 105 rep().fire(new MonitorCommand(new QmpCapabilities())); 106 } 107 108 @Override 109 protected void processInput(String line) 110 throws IOException { 111 logger.fine(() -> "monitor(in): " + line); 112 try { 113 var response = mapper.readValue(line, ObjectNode.class); 114 if (response.has("QMP")) { 115 monitorReady = true; 116 rep().fire(new MonitorReady()); 117 return; 118 } 119 if (response.has("return") || response.has("error")) { 120 QmpCommand executed = executing.poll(); 121 logger.fine( 122 () -> String.format("(Previous \"monitor(in)\" is result " 123 + "from executing %s)", executed)); 124 rep().fire(MonitorResult.from(executed, response)); 125 return; 126 } 127 if (response.has("event")) { 128 MonitorEvent.from(response).ifPresent(rep()::fire); 129 } 130 } catch (JsonProcessingException e) { 131 throw new IOException(e); 132 } 133 } 134 135 /** 136 * On closed. 137 * 138 * @param event the event 139 */ 140 @Handler 141 public void onClosed(Closed<?> event, SocketIOChannel channel) { 142 channel.associated(this, getClass()).ifPresent(qm -> { 143 super.onClosed(event, channel); 144 logger.finer(() -> "QMP socket closed."); 145 monitorReady = false; 146 }); 147 } 148 149 /** 150 * On monitor command. 151 * 152 * @param event the event 153 * @throws IOException 154 */ 155 @Handler 156 @SuppressWarnings({ "PMD.AvoidSynchronizedStatement", 157 "PMD.AvoidDuplicateLiterals" }) 158 public void onMonitorCommand(MonitorCommand event) throws IOException { 159 // Check prerequisites 160 if (!monitorReady && !(event.command() instanceof QmpCapabilities)) { 161 logger.severe(() -> "Premature monitor command (not ready): " 162 + event.command()); 163 rep().fire(new Stop()); 164 return; 165 } 166 167 // Send the command 168 var command = event.command(); 169 logger.fine(() -> "monitor(out): " + command.toString()); 170 String asText; 171 try { 172 asText = command.asText(); 173 } catch (JsonProcessingException e) { 174 logger.log(Level.SEVERE, e, 175 () -> "Cannot serialize Json: " + e.getMessage()); 176 return; 177 } 178 synchronized (executing) { 179 if (writer().isPresent()) { 180 executing.add(command); 181 sendCommand(asText); 182 } 183 } 184 } 185 186 /** 187 * Shutdown the VM. 188 * 189 * @param event the event 190 */ 191 @Handler(priority = 100) 192 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 193 public void onStop(Stop event) { 194 if (!monitorReady) { 195 logger.fine(() -> "No QMP connection," 196 + " cannot send powerdown command"); 197 return; 198 } 199 200 // We have a connection to Qemu, attempt ACPI shutdown if time left 201 powerdownStartedAt = event.associated(Instant.class).orElseGet(() -> { 202 var now = Instant.now(); 203 event.setAssociated(Instant.class, now); 204 return now; 205 }); 206 if (powerdownStartedAt.plusSeconds(powerdownTimeout) 207 .isBefore(Instant.now())) { 208 return; 209 } 210 event.suspendHandling(); 211 suspendedStop = event; 212 213 // Send command. If not confirmed, assume "hanging" qemu process. 214 powerdownTimer = Components.schedule(t -> { 215 logger.fine(() -> "QMP powerdown command not confirmed"); 216 synchronized (this) { 217 powerdownTimer = null; 218 if (suspendedStop != null) { 219 suspendedStop.resumeHandling(); 220 suspendedStop = null; 221 } 222 } 223 }, Duration.ofSeconds(5)); 224 logger.fine(() -> "Attempting QMP (ACPI) powerdown."); 225 rep().fire(new MonitorCommand(new QmpPowerdown())); 226 } 227 228 /** 229 * When the powerdown event is confirmed, wait for termination 230 * or timeout. Termination is detected by the qemu process exiting 231 * (see {@link #onProcessExited(ProcessExited)}). 232 * 233 * @param event the event 234 */ 235 @Handler 236 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 237 public void onPowerdownEvent(PowerdownEvent event) { 238 synchronized (this) { 239 // Cancel confirmation timeout 240 if (powerdownTimer != null) { 241 powerdownTimer.cancel(); 242 } 243 244 // (Re-)schedule timer as fallback 245 var waitUntil = powerdownStartedAt.plusSeconds(powerdownTimeout); 246 logger.fine(() -> "QMP powerdown confirmed, waiting for" 247 + " termination until " + waitUntil); 248 powerdownTimer = Components.schedule(t -> { 249 logger.fine(() -> "Powerdown timeout reached."); 250 synchronized (this) { 251 powerdownTimer = null; 252 if (suspendedStop != null) { 253 suspendedStop.resumeHandling(); 254 suspendedStop = null; 255 } 256 } 257 }, waitUntil); 258 powerdownConfirmed = true; 259 } 260 } 261 262 /** 263 * On process exited. 264 * 265 * @param event the event 266 */ 267 @Handler 268 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 269 public void onProcessExited(ProcessExited event) { 270 if (!event.startedBy().associated(CommandDefinition.class) 271 .map(cd -> ProcessName.QEMU.equals(cd.name())).orElse(false)) { 272 return; 273 } 274 synchronized (this) { 275 if (powerdownTimer != null) { 276 powerdownTimer.cancel(); 277 } 278 if (suspendedStop != null) { 279 suspendedStop.resumeHandling(); 280 suspendedStop = null; 281 } 282 } 283 } 284 285 /** 286 * On configure qemu. 287 * 288 * @param event the event 289 */ 290 @Handler 291 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 292 public void onConfigureQemu(ConfigureQemu event) { 293 int newTimeout = event.configuration().vm.powerdownTimeout; 294 if (powerdownTimeout != newTimeout) { 295 powerdownTimeout = newTimeout; 296 synchronized (this) { 297 if (powerdownTimer != null && powerdownConfirmed) { 298 powerdownTimer 299 .reschedule(powerdownStartedAt.plusSeconds(newTimeout)); 300 } 301 302 } 303 } 304 } 305 306}