Background
Note: In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks.
Dolphinscheduler DAG Implementation
org.apache.dolphinscheduler.common.graph.DAG
Three important data structures of DAG:
// Vertex information
private final Map<Node, NodeInfo> nodesMap;
// Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
// Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
Example below:
DAG<String, String, String> graph = new DAG<>();
graph.addNode("A", "A");
graph.addNode("B", "B");
graph.addNode("C", "C");
// Add an edge from B to C, A is still floating
graph.addEdge("B", "C");
// If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added.
graph.addEdge("A", "B");
Source code analysis:
org.apache.dolphinscheduler.common.graph.DAG#addEdge
public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
lock.writeLock().lock();
try {
// TODO Whether the edge can be added
if (!isLegalAddEdge(fromNode, toNode, createNode)) {
log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
return false;
}
// TODO Add nodes
addNodeIfAbsent(fromNode, null);
addNodeIfAbsent(toNode, null);
// TODO Add edges
addEdge(fromNode, toNode, edge, edgesMap);
addEdge(toNode, fromNode, edge, reverseEdgesMap);
return true;
} finally {
lock.writeLock().unlock();
}
}
private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
// TODO If fromNode and toNode are the same vertex, this edge cannot be added
if (fromNode.equals(toNode)) {
log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
return false;
}
// TODO If not creating a node, meaning fromNode and toNode must be existing vertices
if (!createNode) {
if (!containsNode(fromNode) || !containsNode(toNode)) {
log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
return false;
}
}
// Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the
// DAG has a cycle!
// TODO Get the number of nodes
int verticesCount = getNodesCount();
Queue<Node> queue = new LinkedList<>();
// TODO Put toNode into the queue
queue.add(toNode);
// If DAG doesn't find fromNode, it's not a cycle!
// TODO When the queue is not empty, it is definitely not empty here
while (!queue.isEmpty() && (--verticesCount > 0)) {
// TODO Get the element in the queue
Node key = queue.poll();
for (Node subsequentNode : getSubsequentNodes(key)) {
// TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue
// TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle)
if (subsequentNode.equals(fromNode)) {
return false;
}
queue.add(subsequentNode);
}
}
return true;
}
Dolphinscheduler DagHelper Explanation
The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG.
org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph
public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {
// TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance
List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
workflowInstance.getProcessDefinitionCode(),
workflowInstance.getProcessDefinitionVersion());
// TODO Get the corresponding task definition log
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);
// TODO Get TaskNode
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
// generate process to get DAG info
// TODO Here is to parse whether the start node list is manually specified, which is not by default
List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());
// TODO If the default startNodeNameList is empty
List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());
// TODO Build a ProcessDag object instance
ProcessDag processDag = DagHelper.generateFlowDag(
taskNodeList,
startNodeNameList,
recoveryTaskNodeCodeList,
workflowInstance.getTaskDependType());
if (processDag == null) {
log.error("ProcessDag is null");
throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
}
// TODO Generate DAG
DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
log.debug("Build dag success, dag: {}", dagGraph);
// TODO Use WorkflowGraph to encapsulate the task node list and dagGraph
return new WorkflowGraph(taskNodeList, dagGraph);
}
org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag
public static ProcessDag generateFlowDag(
List<TaskNode> totalTaskNodeList,
List<Long> startNodeNameList,
List<Long> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
// TODO Actually, it is to get all nodes
List<TaskNode> destTaskNodeList =
generateFlowNodeListByStartNode(
totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
// TODO Get the relationship between task nodes
List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);
// TODO Actually, it is to instantiate a ProcessDag
ProcessDag processDag = new ProcessDag();
// TODO Set the edges of DAG
processDag.setEdges(taskNodeRelations);
// TODO Set the vertices of DAG
processDag.setNodes(destTaskNodeList);
return processDag;
}
Set destTaskNodeList and taskNodeRelations
org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph
public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();
// TODO Add vertices
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getCode(), node);
}
}
// TODO Add edges
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
}
Top comments (0)